@@ -88,6 +88,12 @@ internal Settings(Settings other)
88
88
/// </summary>
89
89
public FlowControlSettings FlowControlSettings { get ; set ; }
90
90
91
+ /// <summary>
92
+ /// If set to true, disables enforcing flow control settings at the Cloud PubSub server
93
+ /// and uses the less accurate method of only enforcing flow control at the client side.
94
+ /// </summary>
95
+ public bool UseLegacyFlowControl { get ; set ; } = false ;
96
+
91
97
/// <summary>
92
98
/// The lease time before which a message must either be ACKed
93
99
/// or have its lease extended. This is truncated to the nearest second.
@@ -381,6 +387,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
381
387
_scheduler = settings . Scheduler ?? SystemScheduler . Instance ;
382
388
_taskHelper = GaxPreconditions . CheckNotNull ( taskHelper , nameof ( taskHelper ) ) ;
383
389
_flowControlSettings = settings . FlowControlSettings ?? DefaultFlowControlSettings ;
390
+ _useLegacyFlowControl = settings . UseLegacyFlowControl ;
384
391
_maxAckExtendQueue = ( int ) Math . Min ( _flowControlSettings . MaxOutstandingElementCount ?? long . MaxValue , 20_000 ) ;
385
392
}
386
393
@@ -394,6 +401,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
394
401
private readonly IScheduler _scheduler ;
395
402
private readonly TaskHelper _taskHelper ;
396
403
private readonly FlowControlSettings _flowControlSettings ;
404
+ private readonly bool _useLegacyFlowControl ;
397
405
398
406
private TaskCompletionSource < int > _mainTcs ;
399
407
private CancellationTokenSource _globalSoftStopCts ; // soft-stop is guarenteed to occur before hard-stop.
@@ -424,7 +432,7 @@ public override Task StartAsync(Func<PubsubMessage, CancellationToken, Task<Repl
424
432
// Start all subscribers
425
433
var subscriberTasks = _clients . Select ( client =>
426
434
{
427
- var singleChannel = new SingleChannel ( this , client , handlerAsync , flow , registerTask ) ;
435
+ var singleChannel = new SingleChannel ( this , client , handlerAsync , flow , _useLegacyFlowControl , registerTask ) ;
428
436
return _taskHelper . Run ( ( ) => singleChannel . StartAsync ( ) ) ;
429
437
} ) . ToArray ( ) ;
430
438
// Set up finish task; code that executes when this subscriber is being shutdown (for whatever reason).
@@ -821,7 +829,7 @@ internal TimedId(long time, string id)
821
829
822
830
internal SingleChannel ( SubscriberClientImpl subscriber ,
823
831
SubscriberServiceApiClient client , Func < PubsubMessage , CancellationToken , Task < Reply > > handlerAsync ,
824
- Flow flow ,
832
+ Flow flow , bool useLegacyFlowControl ,
825
833
Action < Task > registerTaskFn )
826
834
{
827
835
_registerTaskFn = registerTaskFn ;
@@ -841,6 +849,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
841
849
_maxAckExtendSendCount = Math . Max ( 10 , subscriber . _maxAckExtendQueue / 4 ) ;
842
850
_maxConcurrentPush = 3 ; // Fairly arbitrary.
843
851
_flow = flow ;
852
+ _useLegacyFlowControl = useLegacyFlowControl ;
844
853
_eventPush = new AsyncAutoResetEvent ( subscriber . _taskHelper ) ;
845
854
_continuationQueue = new AsyncSingleRecvQueue < TaskNextAction > ( subscriber . _taskHelper ) ;
846
855
}
@@ -864,6 +873,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
864
873
private readonly int _maxConcurrentPush ; // Mamimum number (slightly soft) of concurrent ack/nack/extend push RPCs.
865
874
866
875
private readonly Flow _flow ;
876
+ private readonly bool _useLegacyFlowControl ;
867
877
private readonly AsyncAutoResetEvent _eventPush ;
868
878
private readonly AsyncSingleRecvQueue < TaskNextAction > _continuationQueue ;
869
879
private readonly RequeueableQueue < TimedId > _extendQueue = new RequeueableQueue < TimedId > ( ) ;
@@ -992,8 +1002,8 @@ private void HandleStartStreamingPullWithoutBackoff()
992
1002
{
993
1003
SubscriptionAsSubscriptionName = _subscriptionName ,
994
1004
StreamAckDeadlineSeconds = _modifyDeadlineSeconds ,
995
- MaxOutstandingMessages = _flow . MaxOutstandingElementCount ,
996
- MaxOutstandingBytes = _flow . MaxOutstandingByteCount
1005
+ MaxOutstandingMessages = _useLegacyFlowControl ? 0 : _flow . MaxOutstandingElementCount ,
1006
+ MaxOutstandingBytes = _useLegacyFlowControl ? 0 : _flow . MaxOutstandingByteCount
997
1007
} ) ;
998
1008
Add ( initTask , Next ( true , ( ) => HandlePullMoveNext ( initTask ) ) ) ;
999
1009
}
0 commit comments