@@ -78,6 +78,7 @@ internal Settings(Settings other)
78
78
AckDeadline = other . AckDeadline ;
79
79
AckExtensionWindow = other . AckExtensionWindow ;
80
80
Scheduler = other . Scheduler ;
81
+ MaxTotalAckExtension = other . MaxTotalAckExtension ;
81
82
}
82
83
83
84
/// <summary>
@@ -100,6 +101,12 @@ internal Settings(Settings other)
100
101
/// </summary>
101
102
public TimeSpan ? AckExtensionWindow { get ; set ; }
102
103
104
+ /// <summary>
105
+ /// Maximum duration for which a message ACK deadline will be extended.
106
+ /// If <c>null</c>, uses the default of <see cref="DefaultMaxTotalAckExtension"/>.
107
+ /// </summary>
108
+ public TimeSpan ? MaxTotalAckExtension { get ; set ; }
109
+
103
110
/// <summary>
104
111
/// The <see cref="IScheduler"/> used to schedule delays.
105
112
/// If <c>null</c>, the default <see cref="SystemScheduler"/> is used.
@@ -112,6 +119,10 @@ internal void Validate()
112
119
GaxPreconditions . CheckArgumentRange ( AckDeadline , nameof ( AckDeadline ) , MinimumAckDeadline , MaximumAckDeadline ) ;
113
120
var maxAckExtension = TimeSpan . FromTicks ( ( AckDeadline ?? DefaultAckDeadline ) . Ticks / 2 ) ;
114
121
GaxPreconditions . CheckArgumentRange ( AckExtensionWindow , nameof ( AckExtensionWindow ) , MinimumAckExtensionWindow , maxAckExtension ) ;
122
+ if ( MaxTotalAckExtension is TimeSpan maxTotalAckExtension )
123
+ {
124
+ GaxPreconditions . CheckNonNegativeDelay ( maxTotalAckExtension , nameof ( MaxTotalAckExtension ) ) ;
125
+ }
115
126
}
116
127
117
128
/// <summary>
@@ -181,10 +192,10 @@ internal void Validate()
181
192
182
193
/// <summary>
183
194
/// Default <see cref="FlowControlSettings"/> for <see cref="SubscriberClient"/>.
184
- /// Allows 10 ,000 outstanding messages; and 20Mb outstanding bytes.
195
+ /// Allows 1 ,000 outstanding messages; and 100Mb outstanding bytes.
185
196
/// </summary>
186
197
/// <returns>Default <see cref="FlowControlSettings"/> for <see cref="SubscriberClient"/>.</returns>
187
- public static FlowControlSettings DefaultFlowControlSettings { get ; } = new FlowControlSettings ( 10_000 , 20_000_000 ) ;
198
+ public static FlowControlSettings DefaultFlowControlSettings { get ; } = new FlowControlSettings ( 1_000 , 100_000_000 ) ;
188
199
189
200
/// <summary>
190
201
/// The service-defined minimum message ACKnowledgement deadline of 10 seconds.
@@ -207,10 +218,15 @@ internal void Validate()
207
218
public static TimeSpan MinimumAckExtensionWindow { get ; } = TimeSpan . FromMilliseconds ( 50 ) ;
208
219
209
220
/// <summary>
210
- /// The default message ACKnowlegdment extension window of 15 seconds.
221
+ /// The default message ACKnowledgement extension window of 15 seconds.
211
222
/// </summary>
212
223
public static TimeSpan DefaultAckExtensionWindow { get ; } = TimeSpan . FromSeconds ( 15 ) ;
213
224
225
+ /// <summary>
226
+ /// The default maximum total ACKnowledgement extension of 60 minutes.
227
+ /// </summary>
228
+ public static TimeSpan DefaultMaxTotalAckExtension { get ; } = TimeSpan . FromMinutes ( 60 ) ;
229
+
214
230
/// <summary>
215
231
/// Create a <see cref="SubscriberClient"/> instance associated with the specified <see cref="SubscriptionName"/>.
216
232
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
@@ -352,6 +368,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
352
368
// These values are validated in Settings.Validate() above, so no need to re-validate here.
353
369
_modifyDeadlineSeconds = ( int ) ( ( settings . AckDeadline ?? DefaultAckDeadline ) . TotalSeconds ) ;
354
370
_autoExtendInterval = TimeSpan . FromSeconds ( _modifyDeadlineSeconds ) - ( settings . AckExtensionWindow ?? DefaultAckExtensionWindow ) ;
371
+ _maxExtensionDuration = settings . MaxTotalAckExtension ?? DefaultMaxTotalAckExtension ;
355
372
_shutdown = shutdown ;
356
373
_scheduler = settings . Scheduler ?? SystemScheduler . Instance ;
357
374
_taskHelper = GaxPreconditions . CheckNotNull ( taskHelper , nameof ( taskHelper ) ) ;
@@ -363,6 +380,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
363
380
private readonly SubscriberServiceApiClient [ ] _clients ;
364
381
private readonly Func < Task > _shutdown ;
365
382
private readonly TimeSpan _autoExtendInterval ; // Interval between message lease auto-extends
383
+ private readonly TimeSpan _maxExtensionDuration ; // Maximum duration for which a message lease will be extended.
366
384
private readonly int _modifyDeadlineSeconds ; // Value to use as new deadline when lease auto-extends
367
385
private readonly int _maxAckExtendQueue ; // Maximum count of acks/extends to push to server in a single messages
368
386
private readonly IScheduler _scheduler ;
@@ -809,6 +827,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
809
827
_modifyDeadlineSeconds = subscriber . _modifyDeadlineSeconds ;
810
828
_maxAckExtendQueueSize = subscriber . _maxAckExtendQueue ;
811
829
_autoExtendInterval = subscriber . _autoExtendInterval ;
830
+ _maxExtensionDuration = subscriber . _maxExtensionDuration ;
812
831
_extendQueueThrottleInterval = TimeSpan . FromTicks ( ( long ) ( ( TimeSpan . FromSeconds ( _modifyDeadlineSeconds ) - _autoExtendInterval ) . Ticks * 0.5 ) ) ;
813
832
_maxAckExtendSendCount = Math . Max ( 10 , subscriber . _maxAckExtendQueue / 4 ) ;
814
833
_maxConcurrentPush = 3 ; // Fairly arbitrary.
@@ -829,6 +848,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
829
848
private readonly SubscriptionName _subscriptionName ;
830
849
private readonly int _modifyDeadlineSeconds ; // Seconds to add to deadling on lease extension.
831
850
private readonly TimeSpan _autoExtendInterval ; // Delay between auto-extends.
851
+ private readonly TimeSpan _maxExtensionDuration ; // Maximum duration for which a message lease will be extended.
832
852
private readonly TimeSpan _extendQueueThrottleInterval ; // Throttle pull if items in the extend queue are older than this.
833
853
private readonly int _maxAckExtendQueueSize ; // Soft limit on push queue sizes. Used to throttle pulls.
834
854
private readonly int _maxAckExtendSendCount ; // Maximum number of ids to include in an ack/nack/extend push RPC.
@@ -1046,7 +1066,7 @@ private void HandlePullMessageData(Task<bool> moveNextTask)
1046
1066
// Get all ack-ids, used to extend leases as required.
1047
1067
var msgIds = new HashSet < string > ( msgs . Select ( x => x . AckId ) ) ;
1048
1068
// Send an initial "lease-extension"; which starts the server timer.
1049
- HandleExtendLease ( msgIds ) ;
1069
+ HandleExtendLease ( msgIds , null ) ;
1050
1070
// Asynchonously start message processing. Handles flow, and calls the user-supplied message handler.
1051
1071
// Uses Task.Run(), so not to clog up this "master" thread with per-message processing.
1052
1072
Task messagesTask = _taskHelper . Run ( ( ) => ProcessPullMessagesAsync ( msgs , msgIds ) ) ;
@@ -1099,34 +1119,115 @@ await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), msg.Message.
1099
1119
}
1100
1120
}
1101
1121
1102
- private void HandleExtendLease ( HashSet < string > msgIds )
1122
+ private class LeaseCancellation
1123
+ {
1124
+ public LeaseCancellation ( CancellationTokenSource softStopCts ) =>
1125
+ _cts = CancellationTokenSource . CreateLinkedTokenSource ( softStopCts . Token ) ;
1126
+
1127
+ private readonly object _lock = new object ( ) ;
1128
+ private CancellationTokenSource _cts ;
1129
+
1130
+ public CancellationToken Token
1131
+ {
1132
+ get
1133
+ {
1134
+ lock ( _lock )
1135
+ {
1136
+ return _cts ? . Token ?? CancellationToken . None ;
1137
+ }
1138
+ }
1139
+ }
1140
+
1141
+ public void Dispose ( )
1142
+ {
1143
+ lock ( _lock )
1144
+ {
1145
+ _cts . Dispose ( ) ;
1146
+ _cts = null ;
1147
+ }
1148
+ }
1149
+
1150
+ public bool IsDisposed
1151
+ {
1152
+ get
1153
+ {
1154
+ lock ( _lock )
1155
+ {
1156
+ return _cts == null ;
1157
+ }
1158
+ }
1159
+ }
1160
+
1161
+ public void Cancel ( )
1162
+ {
1163
+ CancellationTokenSource cts2 ;
1164
+ lock ( _lock )
1165
+ {
1166
+ cts2 = _cts ;
1167
+ }
1168
+ // Cancel outside of lock, as continuations may be executed synchronously.
1169
+ cts2 ? . Cancel ( ) ;
1170
+ // No need to dispose of `_cts` here, as `Dispose()` will always be called.
1171
+ }
1172
+ }
1173
+
1174
+ private void HandleExtendLease ( HashSet < string > msgIds , LeaseCancellation cancellation )
1103
1175
{
1104
1176
if ( _softStopCts . IsCancellationRequested )
1105
1177
{
1106
1178
// No further lease extensions once stop is requested.
1107
1179
return ;
1108
1180
}
1109
- bool anyMsgIds ;
1110
- lock ( msgIds )
1181
+ // The first call to this method happens as soon as messages in this chunk start to be processed.
1182
+ // This triggers the server to start its lease timer.
1183
+ if ( cancellation == null )
1111
1184
{
1112
- anyMsgIds = msgIds . Count > 0 ;
1113
- if ( anyMsgIds )
1185
+ // Create a task to cancel lease-extension once `_maxExtensionDuration` has been reached.
1186
+ // This set up once for each chunk of received messages, and passed through to each future call to this method.
1187
+ cancellation = new LeaseCancellation ( _softStopCts ) ;
1188
+ Add ( _scheduler . Delay ( _maxExtensionDuration , cancellation . Token ) , Next ( false , ( ) =>
1114
1189
{
1115
- lock ( _lock )
1190
+ // This is executed when `_maxExtensionDuration` has expired, or when `cancellation` is cancelled,
1191
+ // Which ensures `cancellation` is aways disposed of.
1192
+ cancellation . Dispose ( ) ;
1193
+ lock ( msgIds )
1116
1194
{
1117
- _extendQueue . Enqueue ( msgIds . Select ( x => new TimedId ( _extendThrottleHigh + 1 , x ) ) ) ;
1195
+ msgIds . Clear ( ) ;
1118
1196
}
1119
- }
1197
+ } ) ) ;
1120
1198
}
1121
- if ( anyMsgIds )
1199
+ if ( ! cancellation . IsDisposed )
1122
1200
{
1123
- // Ids have been added to _extendQueue, so trigger a push.
1124
- _eventPush . Set ( ) ;
1125
- // Some ids still exist, schedule another extension.
1126
- Add ( _scheduler . Delay ( _autoExtendInterval , _softStopCts . Token ) , Next ( false , ( ) => HandleExtendLease ( msgIds ) ) ) ;
1127
- // Increment _extendThrottles.
1128
- _extendThrottleHigh += 1 ;
1129
- Add ( _scheduler . Delay ( _extendQueueThrottleInterval , _softStopCts . Token ) , Next ( false , ( ) => _extendThrottleLow += 1 ) ) ;
1201
+ // If `_maxExtensionDuration` has not expired, then schedule a further lease extension.
1202
+ bool anyMsgIds ;
1203
+ lock ( msgIds )
1204
+ {
1205
+ anyMsgIds = msgIds . Count > 0 ;
1206
+ if ( anyMsgIds )
1207
+ {
1208
+ lock ( _lock )
1209
+ {
1210
+ _extendQueue . Enqueue ( msgIds . Select ( x => new TimedId ( _extendThrottleHigh + 1 , x ) ) ) ;
1211
+ }
1212
+ }
1213
+ }
1214
+ if ( anyMsgIds )
1215
+ {
1216
+ // Ids have been added to _extendQueue, so trigger a push.
1217
+ _eventPush . Set ( ) ;
1218
+ // Some ids still exist, schedule another extension.
1219
+ // The overall `_maxExtensionDuration` is maintained by passing through the existing `cancellation`.
1220
+ Add ( _scheduler . Delay ( _autoExtendInterval , _softStopCts . Token ) , Next ( false , ( ) => HandleExtendLease ( msgIds , cancellation ) ) ) ;
1221
+ // Increment _extendThrottles.
1222
+ _extendThrottleHigh += 1 ;
1223
+ Add ( _scheduler . Delay ( _extendQueueThrottleInterval , _softStopCts . Token ) , Next ( false , ( ) => _extendThrottleLow += 1 ) ) ;
1224
+ }
1225
+ else
1226
+ {
1227
+ // All messages have been handled in this chunk, so cancel the max-lease-time monitoring.
1228
+ // This will also cause `cancellation` to be disposed in the anonymous function above.
1229
+ cancellation . Cancel ( ) ;
1230
+ }
1130
1231
}
1131
1232
}
1132
1233
0 commit comments