Skip to content

Commit e13ab00

Browse files
authored
Pubsub: Update default settings; add maximum total lease extension (#3920)
1 parent 6b9fc82 commit e13ab00

File tree

3 files changed

+166
-26
lines changed

3 files changed

+166
-26
lines changed

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs

+41
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,34 @@ public void LeaseExtension()
650650
}
651651
}
652652

653+
[Fact]
654+
public void LeaseMaxExtension()
655+
{
656+
var msgs = new[] { new[] {
657+
ServerAction.Data(TimeSpan.Zero, new[] { "1" }),
658+
ServerAction.Inf()
659+
} };
660+
using (var fake = Fake.Create(msgs, ackDeadline: TimeSpan.FromSeconds(30), ackExtendWindow: TimeSpan.FromSeconds(10)))
661+
{
662+
fake.Scheduler.Run(async () =>
663+
{
664+
var doneTask = fake.Subscriber.StartAsync(async (msg, ct) =>
665+
{
666+
// Emulate a hanging message-processing task.
667+
await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromHours(24), ct));
668+
return SubscriberClient.Reply.Ack;
669+
});
670+
await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromHours(12), CancellationToken.None));
671+
await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None));
672+
await fake.TaskHelper.ConfigureAwait(doneTask);
673+
Assert.Equal(1, fake.Subscribers.Count);
674+
// Check that the lease was extended for 60 minutes only.
675+
// +1 is due to initial lease extension at time=0
676+
Assert.Equal((int)SubscriberClient.DefaultMaxTotalAckExtension.TotalSeconds / 20 + 1, fake.Subscribers[0].Extends.Count);
677+
});
678+
}
679+
}
680+
653681
[Fact]
654682
public void SlowUplinkThrottlesPull()
655683
{
@@ -793,6 +821,12 @@ public void ValidParameters()
793821
AckExtensionWindow = TimeSpan.FromTicks(SubscriberClient.DefaultAckDeadline.Ticks / 2)
794822
};
795823
new SubscriberClientImpl(subscriptionName, clients, settingsAckExtension2, null);
824+
825+
var settingsMaxExtension = new SubscriberClient.Settings
826+
{
827+
MaxTotalAckExtension = TimeSpan.FromMinutes(20)
828+
};
829+
new SubscriberClientImpl(subscriptionName, clients, settingsMaxExtension, null);
796830
}
797831

798832
[Fact]
@@ -842,6 +876,13 @@ public void InvalidParameters()
842876
};
843877
var ex8 = Assert.Throws<ArgumentOutOfRangeException>(() => new SubscriberClientImpl(subscriptionName, clients, settingsBadAckExtension2, null));
844878
Assert.Equal("AckExtensionWindow", ex8.ParamName);
879+
880+
var settingsBadMaxExtension = new SubscriberClient.Settings
881+
{
882+
MaxTotalAckExtension = TimeSpan.FromMinutes(-20)
883+
};
884+
var ex9 = Assert.Throws<ArgumentOutOfRangeException>(() => new SubscriberClientImpl(subscriptionName, clients, settingsBadMaxExtension, null));
885+
//Assert.Equal("MaxTotalAckExtension", ex9.ParamName); There's a bug in GaxPreconditions.CheckNonNegativeDelay() which uses the wrong paramName
845886
}
846887
}
847888
}

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/PublisherClient.cs

+4-6
Original file line numberDiff line numberDiff line change
@@ -154,22 +154,20 @@ internal void Validate()
154154
}
155155
}
156156

157-
// All defaults taken from Java (reference) implementation.
158-
159157
/// <summary>
160158
/// Default <see cref="BatchingSettings"/> for <see cref="PublisherClient"/>.
161159
/// Default values are:
162160
/// <see cref="BatchingSettings.ElementCountThreshold"/> = 100;
163-
/// <see cref="BatchingSettings.ByteCountThreshold"/> = 10,000;
164-
/// <see cref="BatchingSettings.DelayThreshold"/> = 1 millisecond;
161+
/// <see cref="BatchingSettings.ByteCountThreshold"/> = 1,000,000;
162+
/// <see cref="BatchingSettings.DelayThreshold"/> = 10 milliseconds;
165163
/// </summary>
166-
public static BatchingSettings DefaultBatchingSettings { get; } = new BatchingSettings(100L, 10_000L, TimeSpan.FromMilliseconds(1));
164+
public static BatchingSettings DefaultBatchingSettings { get; } = new BatchingSettings(100L, 1_000_000L, TimeSpan.FromMilliseconds(10));
167165

168166
/// <summary>
169167
/// The absolute maximum <see cref="BatchingSettings"/> supported by the service.
170168
/// Maximum values are:
171169
/// <see cref="BatchingSettings.ElementCountThreshold"/> = 1,000;
172-
/// <see cref="BatchingSettings.ByteCountThreshold"/> = 9,500,000;
170+
/// <see cref="BatchingSettings.ByteCountThreshold"/> = 10,000,000;
173171
/// </summary>
174172
public static BatchingSettings ApiMaxBatchingSettings { get; } = new BatchingSettings(1000L, 10_000_000L, null);
175173

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs

+121-20
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ internal Settings(Settings other)
7878
AckDeadline = other.AckDeadline;
7979
AckExtensionWindow = other.AckExtensionWindow;
8080
Scheduler = other.Scheduler;
81+
MaxTotalAckExtension = other.MaxTotalAckExtension;
8182
}
8283

8384
/// <summary>
@@ -100,6 +101,12 @@ internal Settings(Settings other)
100101
/// </summary>
101102
public TimeSpan? AckExtensionWindow { get; set; }
102103

104+
/// <summary>
105+
/// Maximum duration for which a message ACK deadline will be extended.
106+
/// If <c>null</c>, uses the default of <see cref="DefaultMaxTotalAckExtension"/>.
107+
/// </summary>
108+
public TimeSpan? MaxTotalAckExtension { get; set; }
109+
103110
/// <summary>
104111
/// The <see cref="IScheduler"/> used to schedule delays.
105112
/// If <c>null</c>, the default <see cref="SystemScheduler"/> is used.
@@ -112,6 +119,10 @@ internal void Validate()
112119
GaxPreconditions.CheckArgumentRange(AckDeadline, nameof(AckDeadline), MinimumAckDeadline, MaximumAckDeadline);
113120
var maxAckExtension = TimeSpan.FromTicks((AckDeadline ?? DefaultAckDeadline).Ticks / 2);
114121
GaxPreconditions.CheckArgumentRange(AckExtensionWindow, nameof(AckExtensionWindow), MinimumAckExtensionWindow, maxAckExtension);
122+
if (MaxTotalAckExtension is TimeSpan maxTotalAckExtension)
123+
{
124+
GaxPreconditions.CheckNonNegativeDelay(maxTotalAckExtension, nameof(MaxTotalAckExtension));
125+
}
115126
}
116127

117128
/// <summary>
@@ -181,10 +192,10 @@ internal void Validate()
181192

182193
/// <summary>
183194
/// Default <see cref="FlowControlSettings"/> for <see cref="SubscriberClient"/>.
184-
/// Allows 10,000 outstanding messages; and 20Mb outstanding bytes.
195+
/// Allows 1,000 outstanding messages; and 100Mb outstanding bytes.
185196
/// </summary>
186197
/// <returns>Default <see cref="FlowControlSettings"/> for <see cref="SubscriberClient"/>.</returns>
187-
public static FlowControlSettings DefaultFlowControlSettings { get; } = new FlowControlSettings(10_000, 20_000_000);
198+
public static FlowControlSettings DefaultFlowControlSettings { get; } = new FlowControlSettings(1_000, 100_000_000);
188199

189200
/// <summary>
190201
/// The service-defined minimum message ACKnowledgement deadline of 10 seconds.
@@ -207,10 +218,15 @@ internal void Validate()
207218
public static TimeSpan MinimumAckExtensionWindow { get; } = TimeSpan.FromMilliseconds(50);
208219

209220
/// <summary>
210-
/// The default message ACKnowlegdment extension window of 15 seconds.
221+
/// The default message ACKnowledgement extension window of 15 seconds.
211222
/// </summary>
212223
public static TimeSpan DefaultAckExtensionWindow { get; } = TimeSpan.FromSeconds(15);
213224

225+
/// <summary>
226+
/// The default maximum total ACKnowledgement extension of 60 minutes.
227+
/// </summary>
228+
public static TimeSpan DefaultMaxTotalAckExtension { get; } = TimeSpan.FromMinutes(60);
229+
214230
/// <summary>
215231
/// Create a <see cref="SubscriberClient"/> instance associated with the specified <see cref="SubscriptionName"/>.
216232
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
@@ -352,6 +368,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
352368
// These values are validated in Settings.Validate() above, so no need to re-validate here.
353369
_modifyDeadlineSeconds = (int)((settings.AckDeadline ?? DefaultAckDeadline).TotalSeconds);
354370
_autoExtendInterval = TimeSpan.FromSeconds(_modifyDeadlineSeconds) - (settings.AckExtensionWindow ?? DefaultAckExtensionWindow);
371+
_maxExtensionDuration = settings.MaxTotalAckExtension ?? DefaultMaxTotalAckExtension;
355372
_shutdown = shutdown;
356373
_scheduler = settings.Scheduler ?? SystemScheduler.Instance;
357374
_taskHelper = GaxPreconditions.CheckNotNull(taskHelper, nameof(taskHelper));
@@ -363,6 +380,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
363380
private readonly SubscriberServiceApiClient[] _clients;
364381
private readonly Func<Task> _shutdown;
365382
private readonly TimeSpan _autoExtendInterval; // Interval between message lease auto-extends
383+
private readonly TimeSpan _maxExtensionDuration; // Maximum duration for which a message lease will be extended.
366384
private readonly int _modifyDeadlineSeconds; // Value to use as new deadline when lease auto-extends
367385
private readonly int _maxAckExtendQueue; // Maximum count of acks/extends to push to server in a single messages
368386
private readonly IScheduler _scheduler;
@@ -809,6 +827,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
809827
_modifyDeadlineSeconds = subscriber._modifyDeadlineSeconds;
810828
_maxAckExtendQueueSize = subscriber._maxAckExtendQueue;
811829
_autoExtendInterval = subscriber._autoExtendInterval;
830+
_maxExtensionDuration = subscriber._maxExtensionDuration;
812831
_extendQueueThrottleInterval = TimeSpan.FromTicks((long)((TimeSpan.FromSeconds(_modifyDeadlineSeconds) - _autoExtendInterval).Ticks * 0.5));
813832
_maxAckExtendSendCount = Math.Max(10, subscriber._maxAckExtendQueue / 4);
814833
_maxConcurrentPush = 3; // Fairly arbitrary.
@@ -829,6 +848,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
829848
private readonly SubscriptionName _subscriptionName;
830849
private readonly int _modifyDeadlineSeconds; // Seconds to add to deadling on lease extension.
831850
private readonly TimeSpan _autoExtendInterval; // Delay between auto-extends.
851+
private readonly TimeSpan _maxExtensionDuration; // Maximum duration for which a message lease will be extended.
832852
private readonly TimeSpan _extendQueueThrottleInterval; // Throttle pull if items in the extend queue are older than this.
833853
private readonly int _maxAckExtendQueueSize; // Soft limit on push queue sizes. Used to throttle pulls.
834854
private readonly int _maxAckExtendSendCount; // Maximum number of ids to include in an ack/nack/extend push RPC.
@@ -1046,7 +1066,7 @@ private void HandlePullMessageData(Task<bool> moveNextTask)
10461066
// Get all ack-ids, used to extend leases as required.
10471067
var msgIds = new HashSet<string>(msgs.Select(x => x.AckId));
10481068
// Send an initial "lease-extension"; which starts the server timer.
1049-
HandleExtendLease(msgIds);
1069+
HandleExtendLease(msgIds, null);
10501070
// Asynchonously start message processing. Handles flow, and calls the user-supplied message handler.
10511071
// Uses Task.Run(), so not to clog up this "master" thread with per-message processing.
10521072
Task messagesTask = _taskHelper.Run(() => ProcessPullMessagesAsync(msgs, msgIds));
@@ -1099,34 +1119,115 @@ await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), msg.Message.
10991119
}
11001120
}
11011121

1102-
private void HandleExtendLease(HashSet<string> msgIds)
1122+
private class LeaseCancellation
1123+
{
1124+
public LeaseCancellation(CancellationTokenSource softStopCts) =>
1125+
_cts = CancellationTokenSource.CreateLinkedTokenSource(softStopCts.Token);
1126+
1127+
private readonly object _lock = new object();
1128+
private CancellationTokenSource _cts;
1129+
1130+
public CancellationToken Token
1131+
{
1132+
get
1133+
{
1134+
lock (_lock)
1135+
{
1136+
return _cts?.Token ?? CancellationToken.None;
1137+
}
1138+
}
1139+
}
1140+
1141+
public void Dispose()
1142+
{
1143+
lock (_lock)
1144+
{
1145+
_cts.Dispose();
1146+
_cts = null;
1147+
}
1148+
}
1149+
1150+
public bool IsDisposed
1151+
{
1152+
get
1153+
{
1154+
lock (_lock)
1155+
{
1156+
return _cts == null;
1157+
}
1158+
}
1159+
}
1160+
1161+
public void Cancel()
1162+
{
1163+
CancellationTokenSource cts2;
1164+
lock (_lock)
1165+
{
1166+
cts2 = _cts;
1167+
}
1168+
// Cancel outside of lock, as continuations may be executed synchronously.
1169+
cts2?.Cancel();
1170+
// No need to dispose of `_cts` here, as `Dispose()` will always be called.
1171+
}
1172+
}
1173+
1174+
private void HandleExtendLease(HashSet<string> msgIds, LeaseCancellation cancellation)
11031175
{
11041176
if (_softStopCts.IsCancellationRequested)
11051177
{
11061178
// No further lease extensions once stop is requested.
11071179
return;
11081180
}
1109-
bool anyMsgIds;
1110-
lock (msgIds)
1181+
// The first call to this method happens as soon as messages in this chunk start to be processed.
1182+
// This triggers the server to start its lease timer.
1183+
if (cancellation == null)
11111184
{
1112-
anyMsgIds = msgIds.Count > 0;
1113-
if (anyMsgIds)
1185+
// Create a task to cancel lease-extension once `_maxExtensionDuration` has been reached.
1186+
// This set up once for each chunk of received messages, and passed through to each future call to this method.
1187+
cancellation = new LeaseCancellation(_softStopCts);
1188+
Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () =>
11141189
{
1115-
lock (_lock)
1190+
// This is executed when `_maxExtensionDuration` has expired, or when `cancellation` is cancelled,
1191+
// Which ensures `cancellation` is aways disposed of.
1192+
cancellation.Dispose();
1193+
lock (msgIds)
11161194
{
1117-
_extendQueue.Enqueue(msgIds.Select(x => new TimedId(_extendThrottleHigh + 1, x)));
1195+
msgIds.Clear();
11181196
}
1119-
}
1197+
}));
11201198
}
1121-
if (anyMsgIds)
1199+
if (!cancellation.IsDisposed)
11221200
{
1123-
// Ids have been added to _extendQueue, so trigger a push.
1124-
_eventPush.Set();
1125-
// Some ids still exist, schedule another extension.
1126-
Add(_scheduler.Delay(_autoExtendInterval, _softStopCts.Token), Next(false, () => HandleExtendLease(msgIds)));
1127-
// Increment _extendThrottles.
1128-
_extendThrottleHigh += 1;
1129-
Add(_scheduler.Delay(_extendQueueThrottleInterval, _softStopCts.Token), Next(false, () => _extendThrottleLow += 1));
1201+
// If `_maxExtensionDuration` has not expired, then schedule a further lease extension.
1202+
bool anyMsgIds;
1203+
lock (msgIds)
1204+
{
1205+
anyMsgIds = msgIds.Count > 0;
1206+
if (anyMsgIds)
1207+
{
1208+
lock (_lock)
1209+
{
1210+
_extendQueue.Enqueue(msgIds.Select(x => new TimedId(_extendThrottleHigh + 1, x)));
1211+
}
1212+
}
1213+
}
1214+
if (anyMsgIds)
1215+
{
1216+
// Ids have been added to _extendQueue, so trigger a push.
1217+
_eventPush.Set();
1218+
// Some ids still exist, schedule another extension.
1219+
// The overall `_maxExtensionDuration` is maintained by passing through the existing `cancellation`.
1220+
Add(_scheduler.Delay(_autoExtendInterval, _softStopCts.Token), Next(false, () => HandleExtendLease(msgIds, cancellation)));
1221+
// Increment _extendThrottles.
1222+
_extendThrottleHigh += 1;
1223+
Add(_scheduler.Delay(_extendQueueThrottleInterval, _softStopCts.Token), Next(false, () => _extendThrottleLow += 1));
1224+
}
1225+
else
1226+
{
1227+
// All messages have been handled in this chunk, so cancel the max-lease-time monitoring.
1228+
// This will also cause `cancellation` to be disposed in the anonymous function above.
1229+
cancellation.Cancel();
1230+
}
11301231
}
11311232
}
11321233

0 commit comments

Comments
 (0)
X