2
\$\begingroup\$

I want to use TIdTCPClient to connect to Redis from a web server application made with Delphi and TWebModule. Currently on WebModule creation I create a new TIdTCPClient and on web module destroy, I disconnect and free the resource. All works... But this way I create a new connection on every new request and after a lot of time there is a growing number of TIME_WAIT sockets.

I would like to implement a global connection pool with a fixed number of connections always open.

NB: I have omitted redis logic, in this example is important only TCP connection pool

Redis.Pool.pas

unit Redis.Pool;  

interface

  uses
    Windows,
    System.Classes,
    System.SysUtils,
    System.SyncObjs,
    System.Threading,
    System.DateUtils,
    IdTCPClient;

  type

    EConnPoolException = class(Exception);

    IRedisConnection = Interface(IInterface)
      function Connection: TIdTCPClient;
      function GetRefCount: Integer;
      function GetLastAccess: TDateTime;
      function GetIsConnected: boolean;
      property LastAccess: TDateTime read GetLastAccess;
      property RefCount: Integer read GetRefCount;
      property IsConnected: boolean read GetIsConnected;
    end;

    // The TRedisConnection class also contains two member fields, 
    // FCriticalSection and FSemaphore. Both of these members are used to point 
    // to a TCriticalSection and a semaphore (a THandle), both of which are 
    // maintained by the TFixedRedisPool class (and are assigned to these 
    // members by the TFixedRedisPool instance when it creates an instance 
    // of the connection module). 
    // The critical section class is used to synchronize access to the internal 
    // reference count of the connection pool instances, and the semaphore 
    // controls access to instances of the TRedisConnection class. This is 
    // shown in the _AddRef and _Release method implementations, shown here. 
    TRedisConnection = class(TInterfacedObject, IRedisConnection)
    private
      FConnection: TIdTCPClient;
    protected
      FRefCount: Integer;
      FLastAccess: TDateTime;
      // When the TRedisConnection is created the
      // connection pool that creates the object
      // will assign its critical section to this field.
      // The object will use this critical section
      // to synchronize access to its reference count.
      FCriticalSection: TCriticalSection;
      // This semaphore points to the FixedConnectionPool's
      // semaphore. It will be used to call ReleaseSemaphore
      // from the _Release method of the TRedisConnection.
      FSemaphore: THandle;
      // These two static methods are reintroduced
      // in order to implement lifecycle management
      // for the interface of this object.
      // Normally, unlike normal COM objects, Delphi
      // TComponent descendants are not lifecycle managed
      // when used in interface references.
      function _AddRef: Integer; stdcall;
      function _Release: Integer; stdcall;
      function GetRefCount: Integer;
      function GetLastAccess: TDateTime;
      function GetIsConnected: boolean;
    public
      constructor Create; overload;
      destructor Destroy; override;
      function Connection: TIdTCPClient;
    end;

    TCleanupThread = class;

    TFixedRedisPool = class(TObject)
    private
      FPool: array of IRedisConnection;
      FPoolSize: Integer;
      FTimeout: Integer;
      FCleanupThread: TCleanupThread;
      // This semaphore is used to limit the number of
      // simultaneous connections. When the nth+1 connection
      // is requested, it will be blocked until a connection
      // becomes available.
      FSemaphore: THandle;
      // This is the critical section that synchronizes
      // access to the connection module reference counts
      FCriticalSection: TCriticalSection;
    public
      // This overloaded constructor takes two optional
      // parameters. These parameters specify the size
      // of the connection pool, as well as how long idle
      // connections in the connection pool will be kept.
      constructor Create(const APoolSize: Integer = 10; const ACleanupDelayMinutes: Integer = 5; const ATimeoutms: Integer = 2000); overload;
      destructor Destroy; override;
      // This function returns an object
      // that implements the IRedisConnection interface.
      // This object can be a data module, as was
      //done in this example.
      function GetConnection: IRedisConnection;
    end;

    // This thread class is used by the connection pool
    // object to cleanup idle connections after a
    // configurable period of time.
    TCleanupThread = class(TThread)
    private
      FCleanupDelay: Integer;
    protected
      // When the thread is created, this critical section
      // field will be assigned the connection pool's
      // critical section. This critical section is
      // used to synchronize access to data module
      // reference counts.
      FCriticalSection: TCriticalSection;
      FFixedConnectionPool: TFixedRedisPool;
      procedure Execute; override;
      constructor Create(const ACreateSuspended: Boolean; const ACleanupDelayMinutes: Integer);
    end;

// Global Connection Pool
var
  RedisConnPool: TFixedRedisPool;

implementation

// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++ TRedisConnection ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

constructor TRedisConnection.Create;
begin
  FConnection := TIdTCPClient.Create;
  FConnection.Connect('LOCALHOST', 6379);
end;

destructor TRedisConnection.Destroy;
begin
  FConnection.Disconnect;
  FConnection.Free;

  inherited;
end;

function TRedisConnection._AddRef: Integer;
begin
  // increment the reference count
  FCriticalSection.Enter;
  try
    Inc(FRefCount);
    Result := FRefCount;
  finally
    FCriticalSection.Leave;
  end;
end;

function TRedisConnection._Release: Integer;
begin
  // decrement the reference count
  FCriticalSection.Enter;
  try
    Dec(FRefCount);
    Result := FRefCount;

    // if not more references, call Destroy
    if Result = 0 then
      Destroy
    else
      Self.FLastAccess := Now;
  finally
    FCriticalSection.Leave;
    if FRefCount = 1 then
      ReleaseSemaphore(FSemaphore, 1, nil);
  end;
end;

function TRedisConnection.GetRefCount: Integer;
begin
   Result := FRefCount;
end;

function TRedisConnection.GetLastAccess: TDateTime;
begin
   Result := Self.FLastAccess;
end;

function TRedisConnection.GetIsConnected: boolean;
begin
   Result := Self.FConnection.Connected;
end;

function TRedisConnection.Connection: TIdTCPClient;
begin
  Result := Self.FConnection;
end;


// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++ TFixedRedisPool +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

constructor TFixedRedisPool.Create(const APoolSize: Integer = 10; const ACleanupDelayMinutes: Integer = 5; const ATimeoutms: Integer = 2000);
begin
  FPoolSize        := APoolSize;
  FTimeout         := ATimeoutms;
  FSemaphore       := CreateSemaphore(nil, APoolSize, APoolSize, '');
  FCriticalSection := TCriticalSection.Create;

  // Set the length of the connection pool
  SetLength(FPool, APoolSize);
  
  // Create and start the cleanup thread
  FCleanupThread := TCleanupThread.Create(True, ACleanupDelayMinutes);
  with FCleanupThread do
  begin
    FreeOnTerminate      := True;
    Priority             := tpLower;
    FFixedConnectionPool := Self;
    Start;
  end;
end;

function TFixedRedisPool.GetConnection: IRedisConnection;
var
  I:                Integer;
  aRedisConnection: TRedisConnection;
  aWaitResult:      Integer;
begin
  Result := nil;

  aWaitResult := WaitForSingleObject(FSemaphore, FTimeout);
  if aWaitResult <> WAIT_OBJECT_0 then raise EConnPoolException.Create('TFixedRedisPool.GetConnection pool timeout.');

  FCriticalSection.Enter;
  try
    for I := Low(FPool) to High(FPool) do
    begin
      // If FPool[i] = nil, the IRedisConnection has
      // not yet been created. Create it, initialize
      // it, and return it. If FPool[i] <> nil, then
      // check to see if its RefCount = 1 (only the pool
      // is referencing the object).
      if FPool[I] = nil then
      begin
        aRedisConnection := TRedisConnection.Create;
        aRedisConnection.FCriticalSection := Self.FCriticalSection;
        aRedisConnection.FSemaphore := Self.FSemaphore;
        FPool[I] := aRedisConnection;
        Result := FPool[I];
        Exit;
      end;
      // if FPool[i].FRefCount = 1 then
      // the connection is available. Return it.
      if (FPool[I].RefCount = 1) and FPool[I].IsConnected then
      begin
        Result := FPool[I];
        Exit;
      end;
    end;
  finally
    FCriticalSection.Leave;
  end;
end;


// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++ TCleanupThread ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

constructor TCleanupThread.Create(const ACreateSuspended: Boolean; const ACleanupDelayMinutes: Integer);
begin
  // always create suspended
  inherited Create(True);

  FCleanupDelay := ACleanupDelayMinutes;
  
  // Start if not created suspended
  if not(ACreateSuspended) then Start;
end;


destructor TFixedRedisPool.Destroy;
var
  I: Integer;
begin
  FCleanupThread.Terminate;

  FCriticalSection.Enter;
  try
    for I := Low(FPool) to High(FPool) do
      FPool[I] := nil;
    SetLength(FPool,0);
  finally
    FCriticalSection.Leave;
  end;
  FCriticalSection.Free;

  CloseHandle(FSemaphore);

  inherited;
end;

procedure TCleanupThread.Execute;
var
  I: Integer;
begin
  while True do
  begin
    if Terminated then Exit;

    // Sleep per delay
    Sleep(FCleanupDelay * 1000 * 60);
    if Terminated then Exit;

    FFixedConnectionPool.FCriticalSection.Enter;
    try
      for I := Low(FFixedConnectionPool.FPool) to High(FFixedConnectionPool.FPool) do begin

        if FFixedConnectionPool.FPool[I] = nil then continue;

        // if the connection exists, has no external reference,
        // and has not been used lately, release it
        if not(FFixedConnectionPool.FPool[I].IsConnected) or (FFixedConnectionPool.FPool[I].RefCount = 1) and (MinutesBetween(FFixedConnectionPool.FPool[I].LastAccess, Now) > FCleanupDelay) then begin
          FFixedConnectionPool.FPool[I] := nil;
        end;
      end;
    finally
      FFixedConnectionPool.FCriticalSection.Leave;
    end;
  end;
end;



initialization
  RedisConnPool := TFixedRedisPool.Create(10, 5, 2000);

finalization
  FreeAndNil(RedisConnPool);

end.

usage

uses Redis.Pool;

var
   aConn: IRedisConnection;
begin
  // RedisConnPool is blobal var into Redis.Pool
  aConn := RedisConnPool.GetConnection;
  try
    // I have omitted redis logic in this example is important only TCP connection pool..
    // aConn.Connection.GET('REDIS_KEY');
    aConn.Connection.IOHandler.Write('HELLO');
  finally
    aConn := nil;
  end;

end;

The question is: Is my implementation thread safe in a in a multithread environment like a TWebModule application?

\$\endgroup\$

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.