Skip to content

Commit 0ce91bb

Browse files
fayssalmartanigcpjskeet
authored andcommitted
feat: Enable server side flow control by default with the option to turn it off
This change enables sending flow control settings automatically to the server. If FlowControlSettings.MaxOutstandingElementCount > 0 or FlowControlSettings.MaxOutstandingByteCount > 0, flow control will be enforced at the server side (in addition to the client side). This behavior is enabled by default and users who would like to opt-out of this feature --in case they encounter issues with server side flow control-- can set Settings.UseLegacyFlowControl=True in SubscriberClient.CreateAsync().
1 parent c3c9448 commit 0ce91bb

File tree

1 file changed

+14
-4
lines changed

1 file changed

+14
-4
lines changed

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs

+14-4
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ internal Settings(Settings other)
8888
/// </summary>
8989
public FlowControlSettings FlowControlSettings { get; set; }
9090

91+
/// <summary>
92+
/// If set to true, disables enforcing flow control settings at the Cloud PubSub server
93+
/// and uses the less accurate method of only enforcing flow control at the client side.
94+
/// </summary>
95+
public bool UseLegacyFlowControl { get; set; } = false;
96+
9197
/// <summary>
9298
/// The lease time before which a message must either be ACKed
9399
/// or have its lease extended. This is truncated to the nearest second.
@@ -381,6 +387,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
381387
_scheduler = settings.Scheduler ?? SystemScheduler.Instance;
382388
_taskHelper = GaxPreconditions.CheckNotNull(taskHelper, nameof(taskHelper));
383389
_flowControlSettings = settings.FlowControlSettings ?? DefaultFlowControlSettings;
390+
_useLegacyFlowControl = settings.UseLegacyFlowControl;
384391
_maxAckExtendQueue = (int)Math.Min(_flowControlSettings.MaxOutstandingElementCount ?? long.MaxValue, 20_000);
385392
}
386393

@@ -394,6 +401,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
394401
private readonly IScheduler _scheduler;
395402
private readonly TaskHelper _taskHelper;
396403
private readonly FlowControlSettings _flowControlSettings;
404+
private readonly bool _useLegacyFlowControl;
397405

398406
private TaskCompletionSource<int> _mainTcs;
399407
private CancellationTokenSource _globalSoftStopCts; // soft-stop is guarenteed to occur before hard-stop.
@@ -424,7 +432,7 @@ public override Task StartAsync(Func<PubsubMessage, CancellationToken, Task<Repl
424432
// Start all subscribers
425433
var subscriberTasks = _clients.Select(client =>
426434
{
427-
var singleChannel = new SingleChannel(this, client, handlerAsync, flow, registerTask);
435+
var singleChannel = new SingleChannel(this, client, handlerAsync, flow, _useLegacyFlowControl, registerTask);
428436
return _taskHelper.Run(() => singleChannel.StartAsync());
429437
}).ToArray();
430438
// Set up finish task; code that executes when this subscriber is being shutdown (for whatever reason).
@@ -821,7 +829,7 @@ internal TimedId(long time, string id)
821829

822830
internal SingleChannel(SubscriberClientImpl subscriber,
823831
SubscriberServiceApiClient client, Func<PubsubMessage, CancellationToken, Task<Reply>> handlerAsync,
824-
Flow flow,
832+
Flow flow, bool useLegacyFlowControl,
825833
Action<Task> registerTaskFn)
826834
{
827835
_registerTaskFn = registerTaskFn;
@@ -841,6 +849,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
841849
_maxAckExtendSendCount = Math.Max(10, subscriber._maxAckExtendQueue / 4);
842850
_maxConcurrentPush = 3; // Fairly arbitrary.
843851
_flow = flow;
852+
_useLegacyFlowControl = useLegacyFlowControl;
844853
_eventPush = new AsyncAutoResetEvent(subscriber._taskHelper);
845854
_continuationQueue = new AsyncSingleRecvQueue<TaskNextAction>(subscriber._taskHelper);
846855
}
@@ -864,6 +873,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
864873
private readonly int _maxConcurrentPush; // Mamimum number (slightly soft) of concurrent ack/nack/extend push RPCs.
865874

866875
private readonly Flow _flow;
876+
private readonly bool _useLegacyFlowControl;
867877
private readonly AsyncAutoResetEvent _eventPush;
868878
private readonly AsyncSingleRecvQueue<TaskNextAction> _continuationQueue;
869879
private readonly RequeueableQueue<TimedId> _extendQueue = new RequeueableQueue<TimedId>();
@@ -992,8 +1002,8 @@ private void HandleStartStreamingPullWithoutBackoff()
9921002
{
9931003
SubscriptionAsSubscriptionName = _subscriptionName,
9941004
StreamAckDeadlineSeconds = _modifyDeadlineSeconds,
995-
MaxOutstandingMessages = _flow.MaxOutstandingElementCount,
996-
MaxOutstandingBytes = _flow.MaxOutstandingByteCount
1005+
MaxOutstandingMessages = _useLegacyFlowControl ? 0 : _flow.MaxOutstandingElementCount,
1006+
MaxOutstandingBytes = _useLegacyFlowControl ? 0 : _flow.MaxOutstandingByteCount
9971007
});
9981008
Add(initTask, Next(true, () => HandlePullMoveNext(initTask)));
9991009
}

0 commit comments

Comments
 (0)