I want to use TIdTCPClient to connect to Redis from a web server application made with Delphi and TWebModule. Currently on WebModule creation I create a new TIdTCPClient and on web module destroy, I disconnect and free the resource. All works... But this way I create a new connection on every new request and after a lot of time there is a growing number of TIME_WAIT sockets.
I would like to implement a global connection pool with a fixed number of connections always open.
NB: I have omitted redis logic, in this example is important only TCP connection pool
Redis.Pool.pas
unit Redis.Pool;
interface
uses
Windows,
System.Classes,
System.SysUtils,
System.SyncObjs,
System.Threading,
System.DateUtils,
IdTCPClient;
type
EConnPoolException = class(Exception);
IRedisConnection = Interface(IInterface)
function Connection: TIdTCPClient;
function GetRefCount: Integer;
function GetLastAccess: TDateTime;
function GetIsConnected: boolean;
property LastAccess: TDateTime read GetLastAccess;
property RefCount: Integer read GetRefCount;
property IsConnected: boolean read GetIsConnected;
end;
// The TRedisConnection class also contains two member fields,
// FCriticalSection and FSemaphore. Both of these members are used to point
// to a TCriticalSection and a semaphore (a THandle), both of which are
// maintained by the TFixedRedisPool class (and are assigned to these
// members by the TFixedRedisPool instance when it creates an instance
// of the connection module).
// The critical section class is used to synchronize access to the internal
// reference count of the connection pool instances, and the semaphore
// controls access to instances of the TRedisConnection class. This is
// shown in the _AddRef and _Release method implementations, shown here.
TRedisConnection = class(TInterfacedObject, IRedisConnection)
private
FConnection: TIdTCPClient;
protected
FRefCount: Integer;
FLastAccess: TDateTime;
// When the TRedisConnection is created the
// connection pool that creates the object
// will assign its critical section to this field.
// The object will use this critical section
// to synchronize access to its reference count.
FCriticalSection: TCriticalSection;
// This semaphore points to the FixedConnectionPool's
// semaphore. It will be used to call ReleaseSemaphore
// from the _Release method of the TRedisConnection.
FSemaphore: THandle;
// These two static methods are reintroduced
// in order to implement lifecycle management
// for the interface of this object.
// Normally, unlike normal COM objects, Delphi
// TComponent descendants are not lifecycle managed
// when used in interface references.
function _AddRef: Integer; stdcall;
function _Release: Integer; stdcall;
function GetRefCount: Integer;
function GetLastAccess: TDateTime;
function GetIsConnected: boolean;
public
constructor Create; overload;
destructor Destroy; override;
function Connection: TIdTCPClient;
end;
TCleanupThread = class;
TFixedRedisPool = class(TObject)
private
FPool: array of IRedisConnection;
FPoolSize: Integer;
FTimeout: Integer;
FCleanupThread: TCleanupThread;
// This semaphore is used to limit the number of
// simultaneous connections. When the nth+1 connection
// is requested, it will be blocked until a connection
// becomes available.
FSemaphore: THandle;
// This is the critical section that synchronizes
// access to the connection module reference counts
FCriticalSection: TCriticalSection;
public
// This overloaded constructor takes two optional
// parameters. These parameters specify the size
// of the connection pool, as well as how long idle
// connections in the connection pool will be kept.
constructor Create(const APoolSize: Integer = 10; const ACleanupDelayMinutes: Integer = 5; const ATimeoutms: Integer = 2000); overload;
destructor Destroy; override;
// This function returns an object
// that implements the IRedisConnection interface.
// This object can be a data module, as was
//done in this example.
function GetConnection: IRedisConnection;
end;
// This thread class is used by the connection pool
// object to cleanup idle connections after a
// configurable period of time.
TCleanupThread = class(TThread)
private
FCleanupDelay: Integer;
protected
// When the thread is created, this critical section
// field will be assigned the connection pool's
// critical section. This critical section is
// used to synchronize access to data module
// reference counts.
FCriticalSection: TCriticalSection;
FFixedConnectionPool: TFixedRedisPool;
procedure Execute; override;
constructor Create(const ACreateSuspended: Boolean; const ACleanupDelayMinutes: Integer);
end;
// Global Connection Pool
var
RedisConnPool: TFixedRedisPool;
implementation
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++ TRedisConnection ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
constructor TRedisConnection.Create;
begin
FConnection := TIdTCPClient.Create;
FConnection.Connect('LOCALHOST', 6379);
end;
destructor TRedisConnection.Destroy;
begin
FConnection.Disconnect;
FConnection.Free;
inherited;
end;
function TRedisConnection._AddRef: Integer;
begin
// increment the reference count
FCriticalSection.Enter;
try
Inc(FRefCount);
Result := FRefCount;
finally
FCriticalSection.Leave;
end;
end;
function TRedisConnection._Release: Integer;
begin
// decrement the reference count
FCriticalSection.Enter;
try
Dec(FRefCount);
Result := FRefCount;
// if not more references, call Destroy
if Result = 0 then
Destroy
else
Self.FLastAccess := Now;
finally
FCriticalSection.Leave;
if FRefCount = 1 then
ReleaseSemaphore(FSemaphore, 1, nil);
end;
end;
function TRedisConnection.GetRefCount: Integer;
begin
Result := FRefCount;
end;
function TRedisConnection.GetLastAccess: TDateTime;
begin
Result := Self.FLastAccess;
end;
function TRedisConnection.GetIsConnected: boolean;
begin
Result := Self.FConnection.Connected;
end;
function TRedisConnection.Connection: TIdTCPClient;
begin
Result := Self.FConnection;
end;
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++ TFixedRedisPool +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
constructor TFixedRedisPool.Create(const APoolSize: Integer = 10; const ACleanupDelayMinutes: Integer = 5; const ATimeoutms: Integer = 2000);
begin
FPoolSize := APoolSize;
FTimeout := ATimeoutms;
FSemaphore := CreateSemaphore(nil, APoolSize, APoolSize, '');
FCriticalSection := TCriticalSection.Create;
// Set the length of the connection pool
SetLength(FPool, APoolSize);
// Create and start the cleanup thread
FCleanupThread := TCleanupThread.Create(True, ACleanupDelayMinutes);
with FCleanupThread do
begin
FreeOnTerminate := True;
Priority := tpLower;
FFixedConnectionPool := Self;
Start;
end;
end;
function TFixedRedisPool.GetConnection: IRedisConnection;
var
I: Integer;
aRedisConnection: TRedisConnection;
aWaitResult: Integer;
begin
Result := nil;
aWaitResult := WaitForSingleObject(FSemaphore, FTimeout);
if aWaitResult <> WAIT_OBJECT_0 then raise EConnPoolException.Create('TFixedRedisPool.GetConnection pool timeout.');
FCriticalSection.Enter;
try
for I := Low(FPool) to High(FPool) do
begin
// If FPool[i] = nil, the IRedisConnection has
// not yet been created. Create it, initialize
// it, and return it. If FPool[i] <> nil, then
// check to see if its RefCount = 1 (only the pool
// is referencing the object).
if FPool[I] = nil then
begin
aRedisConnection := TRedisConnection.Create;
aRedisConnection.FCriticalSection := Self.FCriticalSection;
aRedisConnection.FSemaphore := Self.FSemaphore;
FPool[I] := aRedisConnection;
Result := FPool[I];
Exit;
end;
// if FPool[i].FRefCount = 1 then
// the connection is available. Return it.
if (FPool[I].RefCount = 1) and FPool[I].IsConnected then
begin
Result := FPool[I];
Exit;
end;
end;
finally
FCriticalSection.Leave;
end;
end;
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++ TCleanupThread ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
constructor TCleanupThread.Create(const ACreateSuspended: Boolean; const ACleanupDelayMinutes: Integer);
begin
// always create suspended
inherited Create(True);
FCleanupDelay := ACleanupDelayMinutes;
// Start if not created suspended
if not(ACreateSuspended) then Start;
end;
destructor TFixedRedisPool.Destroy;
var
I: Integer;
begin
FCleanupThread.Terminate;
FCriticalSection.Enter;
try
for I := Low(FPool) to High(FPool) do
FPool[I] := nil;
SetLength(FPool,0);
finally
FCriticalSection.Leave;
end;
FCriticalSection.Free;
CloseHandle(FSemaphore);
inherited;
end;
procedure TCleanupThread.Execute;
var
I: Integer;
begin
while True do
begin
if Terminated then Exit;
// Sleep per delay
Sleep(FCleanupDelay * 1000 * 60);
if Terminated then Exit;
FFixedConnectionPool.FCriticalSection.Enter;
try
for I := Low(FFixedConnectionPool.FPool) to High(FFixedConnectionPool.FPool) do begin
if FFixedConnectionPool.FPool[I] = nil then continue;
// if the connection exists, has no external reference,
// and has not been used lately, release it
if not(FFixedConnectionPool.FPool[I].IsConnected) or (FFixedConnectionPool.FPool[I].RefCount = 1) and (MinutesBetween(FFixedConnectionPool.FPool[I].LastAccess, Now) > FCleanupDelay) then begin
FFixedConnectionPool.FPool[I] := nil;
end;
end;
finally
FFixedConnectionPool.FCriticalSection.Leave;
end;
end;
end;
initialization
RedisConnPool := TFixedRedisPool.Create(10, 5, 2000);
finalization
FreeAndNil(RedisConnPool);
end.
usage
uses Redis.Pool;
var
aConn: IRedisConnection;
begin
// RedisConnPool is blobal var into Redis.Pool
aConn := RedisConnPool.GetConnection;
try
// I have omitted redis logic in this example is important only TCP connection pool..
// aConn.Connection.GET('REDIS_KEY');
aConn.Connection.IOHandler.Write('HELLO');
finally
aConn := nil;
end;
end;
The question is: Is my implementation thread safe in a in a multithread environment like a TWebModule application?