16
16
using Google . Api . Gax . Grpc ;
17
17
using Google . Cloud . PubSub . V1 . Tasks ;
18
18
using Google . Cloud . PubSub . V1 . Tests . Tasks ;
19
- using Google . Protobuf ;
19
+ using Grpc . Core ;
20
20
using System ;
21
21
using System . Collections . Generic ;
22
22
using System . Linq ;
@@ -30,11 +30,15 @@ public class PublisherClientTest
30
30
{
31
31
private class FakePublisherServiceApiClient : PublisherServiceApiClient
32
32
{
33
- public FakePublisherServiceApiClient ( IScheduler scheduler , TaskHelper taskHelper , params TimeSpan [ ] delays )
33
+ public FakePublisherServiceApiClient ( IScheduler scheduler , TaskHelper taskHelper ,
34
+ TimeSpan [ ] delays = null , string orderingKeyErrorUnrecoverable = null , string orderingKeyErrorRecoverable = null )
34
35
{
35
36
_schduler = scheduler ;
36
37
_taskHelper = taskHelper ;
37
- _delays = Enumerable . Repeat ( delays . DefaultIfEmpty ( ) , int . MaxValue ) . SelectMany ( x => x ) . GetEnumerator ( ) ;
38
+ _delays = Enumerable . Repeat ( ( delays ?? Enumerable . Empty < TimeSpan > ( ) ) . DefaultIfEmpty ( ) , int . MaxValue )
39
+ . SelectMany ( x => x ) . GetEnumerator ( ) ;
40
+ _orderingKeyErrorUnrecoverable = orderingKeyErrorUnrecoverable ;
41
+ _orderingKeyErrorRecoverable = orderingKeyErrorRecoverable ;
38
42
_handledIds = new List < string > ( ) ;
39
43
}
40
44
@@ -44,6 +48,9 @@ public FakePublisherServiceApiClient(IScheduler scheduler, TaskHelper taskHelper
44
48
private readonly IEnumerator < TimeSpan > _delays ;
45
49
private readonly List < string > _handledIds ;
46
50
51
+ private string _orderingKeyErrorUnrecoverable ;
52
+ private string _orderingKeyErrorRecoverable ;
53
+
47
54
public IReadOnlyList < string > HandledMessages
48
55
{
49
56
get
@@ -59,21 +66,40 @@ public override async Task<PublishResponse> PublishAsync(PublishRequest request,
59
66
{
60
67
_delays . MoveNext ( ) ;
61
68
await _taskHelper . ConfigureAwait ( _schduler . Delay ( _delays . Current , callSettings ? . CancellationToken ?? CancellationToken . None ) ) ;
69
+ var byOrderingKey = request . Messages . GroupBy ( x => x . OrderingKey ) . ToList ( ) ;
70
+ if ( byOrderingKey . Count > 1 )
71
+ {
72
+ throw new InvalidOperationException ( "Multiple ordering-keys should not be present within a single batch." ) ;
73
+ }
62
74
var msgIds = request . Messages . Select ( x => x . Data . ToStringUtf8 ( ) ) ;
63
75
lock ( _lock )
64
76
{
77
+ if ( byOrderingKey . Count > 0 && byOrderingKey [ 0 ] . Key == _orderingKeyErrorUnrecoverable )
78
+ {
79
+ // Cause a one-off unrecoverable error.
80
+ _orderingKeyErrorUnrecoverable = null ;
81
+ throw new RpcException ( new Status ( StatusCode . DataLoss , "Data loss" ) ) ;
82
+ }
83
+ if ( byOrderingKey . Count > 0 && byOrderingKey [ 0 ] . Key == _orderingKeyErrorRecoverable )
84
+ {
85
+ // Cause a one-off recoverable error.
86
+ _orderingKeyErrorRecoverable = null ;
87
+ throw new RpcException ( new Status ( StatusCode . Unavailable , "Unavailable" ) ) ;
88
+ }
65
89
_handledIds . AddRange ( msgIds ) ;
66
90
}
67
91
return new PublishResponse { MessageIds = { msgIds } } ;
68
92
}
69
93
}
70
94
71
- private PublisherClient . Settings MakeSettings ( IScheduler scheduler , int batchElementCountThreshold = 1 , int batchRequestByteThreshold = 1 )
95
+ private PublisherClient . Settings MakeSettings ( IScheduler scheduler ,
96
+ int batchElementCountThreshold = 1 , int batchRequestByteThreshold = 1 , bool enableMessageOrdering = false )
72
97
{
73
98
return new PublisherClient . Settings
74
99
{
75
100
Scheduler = scheduler ,
76
101
BatchingSettings = new BatchingSettings ( batchElementCountThreshold , batchRequestByteThreshold , TimeSpan . FromSeconds ( 10 ) ) ,
102
+ EnableMessageOrdering = enableMessageOrdering
77
103
} ;
78
104
79
105
}
@@ -143,7 +169,7 @@ public void Shutdown(
143
169
var topicName = new TopicName ( "FakeProject" , "FakeTopic" ) ;
144
170
var scheduler = new TestScheduler ( ) ;
145
171
TaskHelper taskHelper = scheduler . TaskHelper ;
146
- var client = new FakePublisherServiceApiClient ( scheduler , taskHelper , TimeSpan . FromSeconds ( 1 ) ) ;
172
+ var client = new FakePublisherServiceApiClient ( scheduler , taskHelper , delays : new [ ] { TimeSpan . FromSeconds ( 1 ) } ) ;
147
173
var settings = MakeSettings ( scheduler , batchElementCountThreshold : 2 , batchRequestByteThreshold : 1000 ) ;
148
174
int shutdownCount = 0 ;
149
175
var pub = new PublisherClientImpl ( topicName , new [ ] { client } , settings , ( ) =>
@@ -182,5 +208,116 @@ public void SettingsValidation()
182
208
new PublisherClient . Settings { BatchingSettings = new BatchingSettings ( null , 1 , null ) } . Validate ( ) ;
183
209
new PublisherClient . Settings { BatchingSettings = new BatchingSettings ( null , PublisherClient . ApiMaxBatchingSettings . ByteCountThreshold , null ) } . Validate ( ) ;
184
210
}
211
+
212
+ [ Fact ]
213
+ public void PublishingMessageWithOrderingKeyRequiresOrderingEnabled ( )
214
+ {
215
+ var topicName = new TopicName ( "FakeProject" , "FakeTopic" ) ;
216
+ var scheduler = new TestScheduler ( ) ;
217
+ TaskHelper taskHelper = scheduler . TaskHelper ;
218
+ var client = new FakePublisherServiceApiClient ( scheduler , taskHelper ) ;
219
+ var settings = MakeSettings ( scheduler ) ;
220
+ int shutdownCount = 0 ;
221
+ var pub = new PublisherClientImpl ( topicName , new [ ] { client } , settings , ( ) =>
222
+ {
223
+ Interlocked . Increment ( ref shutdownCount ) ;
224
+ return Task . FromResult ( 0 ) ;
225
+ } , taskHelper ) ;
226
+ scheduler . Run ( async ( ) =>
227
+ {
228
+ await taskHelper . ConfigureAwait (
229
+ Assert . ThrowsAsync < InvalidOperationException > ( ( ) => pub . PublishAsync ( "an ordering key" , "1" ) ) ) ;
230
+ } ) ;
231
+ }
232
+
233
+ [ Theory , PairwiseData ]
234
+ public void OrderingKeyManyMessages (
235
+ [ CombinatorialValues ( 1 , 2 , 5 , 7 ) ] int clientCount ,
236
+ [ CombinatorialValues ( 1 , 2 , 6 , 13 ) ] int threadCount ,
237
+ [ CombinatorialValues ( 101 , 2000 , 9999 ) ] int messageCount ,
238
+ [ CombinatorialValues ( 1 , 2 , 9 , 51 ) ] int orderingKeysCount ,
239
+ [ CombinatorialValues ( 1 , 5 , 50 ) ] int batchElementCountThreshold ,
240
+ [ CombinatorialValues ( 0 , 1 , 59 , 123 , 1001 ) ] int delayMs1 ,
241
+ [ CombinatorialValues ( 0 , 2 , 500 ) ] int delayMs2 )
242
+ {
243
+ var topicName = new TopicName ( "FakeProject" , "FakeTopic" ) ;
244
+ var scheduler = new TestScheduler ( threadCount ) ;
245
+ TaskHelper taskHelper = scheduler . TaskHelper ;
246
+ var clients = Enumerable . Range ( 0 , clientCount )
247
+ . Select ( _ => new FakePublisherServiceApiClient ( scheduler , taskHelper ,
248
+ new [ ] { TimeSpan . FromMilliseconds ( delayMs1 ) , TimeSpan . FromMilliseconds ( delayMs2 ) } ) ) . ToArray ( ) ;
249
+ var settings = MakeSettings ( scheduler ,
250
+ batchElementCountThreshold : batchElementCountThreshold , batchRequestByteThreshold : 10000 , enableMessageOrdering : true ) ;
251
+ int shutdownCount = 0 ;
252
+ var pub = new PublisherClientImpl ( topicName , clients , settings , ( ) =>
253
+ {
254
+ Interlocked . Increment ( ref shutdownCount ) ;
255
+ return Task . FromResult ( 0 ) ;
256
+ } , taskHelper ) ;
257
+ scheduler . Run ( async ( ) =>
258
+ {
259
+ var tasks = Enumerable . Range ( 0 , messageCount )
260
+ . Select ( i => pub . PublishAsync ( ( i % orderingKeysCount ) . ToString ( ) , $ "{ i % orderingKeysCount } :{ i } ") ) . ToArray ( ) ;
261
+ var ids = new HashSet < string > ( await taskHelper . ConfigureAwait ( taskHelper . WhenAll ( tasks ) ) ) ;
262
+ await taskHelper . ConfigureAwait ( pub . ShutdownAsync ( new CancellationToken ( ) ) ) ;
263
+ Assert . Equal ( messageCount , ids . Count ) ;
264
+ // This doesn't check the global ordering between clients, but that's OK here.
265
+ // The emulator-based integration test checks are more thorough.
266
+ foreach ( var client in clients )
267
+ {
268
+ var kv = client . HandledMessages . Select ( x => x . Split ( ':' ) ) . Select ( x => ( key : x [ 0 ] , value : x [ 1 ] ) ) ;
269
+ foreach ( var values in kv . GroupBy ( x => x . key , x => x . value ) )
270
+ {
271
+ var errorMsg = $ "Ordering-key '{ values . Key } ' out of order";
272
+ foreach ( var pair in values . Zip ( values . Skip ( 1 ) , ( a , b ) => ( a , b ) ) )
273
+ {
274
+ Assert . True ( int . Parse ( pair . a ) < int . Parse ( pair . b ) , errorMsg ) ;
275
+ }
276
+ }
277
+ }
278
+ Assert . Equal ( ids , new HashSet < string > ( clients . SelectMany ( x => x . HandledMessages ) ) ) ;
279
+ Assert . Equal ( 1 , shutdownCount ) ;
280
+ } ) ;
281
+ }
282
+
283
+ [ Fact ]
284
+ public void OrderingKeyResumePublish ( )
285
+ {
286
+ const string unrecoverableKey = "error-unrecoverable" ;
287
+ const string recoverableKey = "error-recoverable" ;
288
+ var topicName = new TopicName ( "FakeProject" , "FakeTopic" ) ;
289
+ var scheduler = new TestScheduler ( ) ;
290
+ TaskHelper taskHelper = scheduler . TaskHelper ;
291
+ var client = new FakePublisherServiceApiClient ( scheduler , taskHelper ,
292
+ orderingKeyErrorUnrecoverable : unrecoverableKey , orderingKeyErrorRecoverable : recoverableKey ) ;
293
+ var settings = MakeSettings ( scheduler , enableMessageOrdering : true ) ;
294
+ int shutdownCount = 0 ;
295
+ var pub = new PublisherClientImpl ( topicName , new [ ] { client } , settings , ( ) =>
296
+ {
297
+ Interlocked . Increment ( ref shutdownCount ) ;
298
+ return Task . FromResult ( 0 ) ;
299
+ } , taskHelper ) ;
300
+ scheduler . Run ( async ( ) =>
301
+ {
302
+ // First call will trigger an unrecoverable error.
303
+ var ex = await taskHelper . ConfigureAwait (
304
+ Assert . ThrowsAsync < RpcException > ( ( ) => pub . PublishAsync ( unrecoverableKey , "unrecoverable error" ) ) ) ;
305
+ Assert . Equal ( StatusCode . DataLoss , ex . StatusCode ) ;
306
+ // Sending again will reject the message.
307
+ await taskHelper . ConfigureAwait (
308
+ Assert . ThrowsAsync < OrderingKeyInErrorStateException > ( ( ) => pub . PublishAsync ( unrecoverableKey , "key in error state" ) ) ) ;
309
+ // Other ordering-keys publish OK.
310
+ await taskHelper . ConfigureAwait ( pub . PublishAsync ( "ok-key" , "ok" ) ) ;
311
+ // Including a recoverable error.
312
+ await taskHelper . ConfigureAwait ( pub . PublishAsync ( recoverableKey , "recoverable error" ) ) ;
313
+ // Including a message without an ordering key.
314
+ await taskHelper . ConfigureAwait ( pub . PublishAsync ( "key not ordered" ) ) ;
315
+ // Resume publishing on the ordering key.
316
+ pub . ResumePublish ( unrecoverableKey ) ;
317
+ await taskHelper . ConfigureAwait ( pub . PublishAsync ( unrecoverableKey , "unrecoverable key resumed" ) ) ;
318
+ var expected = new HashSet < string > ( new [ ] { "ok" , "key not ordered" , "recoverable error" , "unrecoverable key resumed" } ) ;
319
+ Assert . Equal ( expected , new HashSet < string > ( client . HandledMessages ) ) ;
320
+ } ) ;
321
+ }
185
322
}
186
323
}
0 commit comments