Skip to content

Commit afa4a96

Browse files
authored
Pubsub client ordering-keys (#3099)
1 parent a46edf8 commit afa4a96

File tree

8 files changed

+797
-115
lines changed

8 files changed

+797
-115
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>netcoreapp2.2</TargetFramework>
6+
<IsPackable>False</IsPackable>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<ProjectReference Include="..\Google.Cloud.PubSub.V1\Google.Cloud.PubSub.V1.csproj" />
11+
</ItemGroup>
12+
13+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
using Grpc.Core;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.IO;
5+
using System.Linq;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace Google.Cloud.PubSub.V1.OrderingKeyTester
10+
{
11+
/// <summary>
12+
/// Tests the "ordering-keys" public client implementation.
13+
/// Use the `ordering_key_generator` to generate the input CSV file, then run this program, then verify
14+
/// the output CSV file using the `ordering_key_verifier`.
15+
/// The input and output CSV files contain lines in the format:
16+
/// "{ordering key, or empty string for no ordering key}","{message text}"
17+
/// Before running this test the pubsub emulator must be started on localhost using ipv4 (not ipv6).
18+
/// </summary>
19+
class Program
20+
{
21+
private class InputLine
22+
{
23+
public InputLine(string line)
24+
{
25+
var parts = line.Split(',');
26+
if (parts.Length != 2)
27+
{
28+
throw new Exception("Expected two parts in input file.");
29+
}
30+
OrderingKey = parts[0].Trim('"');
31+
Message = parts[1].Trim('"');
32+
}
33+
34+
public string OrderingKey { get; }
35+
public string Message { get; }
36+
}
37+
38+
static int Main(string[] args)
39+
{
40+
if (args.Length != 3)
41+
{
42+
Console.WriteLine("Call with three args: <emulator port> <input file> <output file>");
43+
Console.WriteLine("This connects to host 127.0.0.1, so requires the emulator to be started using ipv4, not ipv6:");
44+
Console.WriteLine(" E.g. cloud-pubsub-emulator.bat --host=127.0.0.1 --port=8700");
45+
Console.WriteLine("It reads and writes CSV files as specified in the 'Testing Ordering Keys' section of the");
46+
Console.WriteLine(" 'Pub/Sub Ordering Key Client Libraries' doc.");
47+
Console.WriteLine();
48+
return 1;
49+
}
50+
51+
// Read inputs.
52+
var port = int.Parse(args[0]);
53+
var inputLines = File.ReadAllLines(args[1]).Select(line => new InputLine(line)).ToList();
54+
// Setup gRPC channel to pubsub emulator.
55+
var channel = new Channel("127.0.0.1", port, ChannelCredentials.Insecure);
56+
57+
// Create topic and subscription names.
58+
var topicName = new TopicName("project", $"topic-{Guid.NewGuid()}");
59+
var subscriptionName = new SubscriptionName("project", $"subscription-{Guid.NewGuid()}");
60+
// List that records all received messages.
61+
var recvMsgs = new List<PubsubMessage>();
62+
63+
// Run test.
64+
CreateTopicAndSubscription();
65+
Task subTask = Subscribe();
66+
IEnumerable<Task> pubTasks = Publish();
67+
68+
// Wait for publish and subscribe tasks to complete.
69+
Console.WriteLine("Waiting for all publish tasks to complete");
70+
Task.WaitAll(pubTasks.ToArray());
71+
Console.WriteLine("All publish tasks completed");
72+
Console.WriteLine("Waiting for subscribe task to complete");
73+
subTask.Wait();
74+
Console.WriteLine("Subscribe task completed");
75+
76+
// Output ordered CSV file of recevied messages, for the validator.
77+
var csvLines = recvMsgs.Select(x => $"\"{x.OrderingKey}\",\"{x.Data.ToStringUtf8()}\"").ToList();
78+
File.WriteAllLines(args[2], csvLines);
79+
Console.WriteLine("Output file written; all done :)");
80+
81+
return 0;
82+
83+
void CreateTopicAndSubscription()
84+
{
85+
Console.WriteLine("Creating topic and subscription");
86+
var pubApi = PublisherServiceApiClient.Create(channel);
87+
var topic = pubApi.CreateTopic(topicName);
88+
var subApi = SubscriberServiceApiClient.Create(channel);
89+
subApi.CreateSubscription(new Subscription
90+
{
91+
EnableMessageOrdering = true,
92+
TopicAsTopicNameOneof = TopicNameOneof.From(topicName),
93+
SubscriptionName = subscriptionName,
94+
AckDeadlineSeconds = 120,
95+
});
96+
}
97+
98+
Task Subscribe()
99+
{
100+
Console.WriteLine("Creating subscribers");
101+
var subs = new[]
102+
{
103+
SubscriberServiceApiClient.Create(channel),
104+
SubscriberServiceApiClient.Create(channel),
105+
SubscriberServiceApiClient.Create(channel)
106+
};
107+
var sub = new SubscriberClientImpl(subscriptionName, subs, new SubscriberClient.Settings(), null);
108+
var recvCount = 0;
109+
var rnd = new Random();
110+
Console.WriteLine("Starting subscriber callback");
111+
return sub.StartAsync(async (msg, ct) =>
112+
{
113+
lock (recvMsgs)
114+
{
115+
recvMsgs.Add(msg.Clone());
116+
recvCount += 1;
117+
if (recvCount == inputLines.Count)
118+
{
119+
Console.WriteLine("Received all messages, shutting down");
120+
var dummyTask = sub.StopAsync(CancellationToken.None);
121+
}
122+
}
123+
if (rnd.Next(3) == 0)
124+
{
125+
await Task.Delay(rnd.Next(3));
126+
}
127+
return SubscriberClient.Reply.Ack;
128+
});
129+
}
130+
131+
IEnumerable<Task> Publish()
132+
{
133+
Console.WriteLine("Creating publishers");
134+
var pubs = new[]
135+
{
136+
PublisherServiceApiClient.Create(channel),
137+
PublisherServiceApiClient.Create(channel),
138+
PublisherServiceApiClient.Create(channel)
139+
};
140+
var pub = new PublisherClientImpl(topicName, pubs, new PublisherClient.Settings { EnableMessageOrdering = true }, null);
141+
var publishTasks = new List<Task>();
142+
Console.WriteLine("Starting to publish");
143+
foreach (var inputLine in inputLines)
144+
{
145+
var pubTask = pub.PublishAsync(inputLine.OrderingKey, inputLine.Message);
146+
publishTasks.Add(pubTask);
147+
}
148+
Console.WriteLine("Publishing complete");
149+
return publishTasks;
150+
}
151+
}
152+
}
153+
}

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/PublisherClientTest.cs

+142-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
using Google.Api.Gax.Grpc;
1717
using Google.Cloud.PubSub.V1.Tasks;
1818
using Google.Cloud.PubSub.V1.Tests.Tasks;
19-
using Google.Protobuf;
19+
using Grpc.Core;
2020
using System;
2121
using System.Collections.Generic;
2222
using System.Linq;
@@ -30,11 +30,15 @@ public class PublisherClientTest
3030
{
3131
private class FakePublisherServiceApiClient : PublisherServiceApiClient
3232
{
33-
public FakePublisherServiceApiClient(IScheduler scheduler, TaskHelper taskHelper, params TimeSpan[] delays)
33+
public FakePublisherServiceApiClient(IScheduler scheduler, TaskHelper taskHelper,
34+
TimeSpan[] delays = null, string orderingKeyErrorUnrecoverable = null, string orderingKeyErrorRecoverable = null)
3435
{
3536
_schduler = scheduler;
3637
_taskHelper = taskHelper;
37-
_delays = Enumerable.Repeat(delays.DefaultIfEmpty(), int.MaxValue).SelectMany(x => x).GetEnumerator();
38+
_delays = Enumerable.Repeat((delays ?? Enumerable.Empty<TimeSpan>()).DefaultIfEmpty(), int.MaxValue)
39+
.SelectMany(x => x).GetEnumerator();
40+
_orderingKeyErrorUnrecoverable = orderingKeyErrorUnrecoverable;
41+
_orderingKeyErrorRecoverable = orderingKeyErrorRecoverable;
3842
_handledIds = new List<string>();
3943
}
4044

@@ -44,6 +48,9 @@ public FakePublisherServiceApiClient(IScheduler scheduler, TaskHelper taskHelper
4448
private readonly IEnumerator<TimeSpan> _delays;
4549
private readonly List<string> _handledIds;
4650

51+
private string _orderingKeyErrorUnrecoverable;
52+
private string _orderingKeyErrorRecoverable;
53+
4754
public IReadOnlyList<string> HandledMessages
4855
{
4956
get
@@ -59,21 +66,40 @@ public override async Task<PublishResponse> PublishAsync(PublishRequest request,
5966
{
6067
_delays.MoveNext();
6168
await _taskHelper.ConfigureAwait(_schduler.Delay(_delays.Current, callSettings?.CancellationToken ?? CancellationToken.None));
69+
var byOrderingKey = request.Messages.GroupBy(x => x.OrderingKey).ToList();
70+
if (byOrderingKey.Count > 1)
71+
{
72+
throw new InvalidOperationException("Multiple ordering-keys should not be present within a single batch.");
73+
}
6274
var msgIds = request.Messages.Select(x => x.Data.ToStringUtf8());
6375
lock (_lock)
6476
{
77+
if (byOrderingKey.Count > 0 && byOrderingKey[0].Key == _orderingKeyErrorUnrecoverable)
78+
{
79+
// Cause a one-off unrecoverable error.
80+
_orderingKeyErrorUnrecoverable = null;
81+
throw new RpcException(new Status(StatusCode.DataLoss, "Data loss"));
82+
}
83+
if (byOrderingKey.Count > 0 && byOrderingKey[0].Key == _orderingKeyErrorRecoverable)
84+
{
85+
// Cause a one-off recoverable error.
86+
_orderingKeyErrorRecoverable = null;
87+
throw new RpcException(new Status(StatusCode.Unavailable, "Unavailable"));
88+
}
6589
_handledIds.AddRange(msgIds);
6690
}
6791
return new PublishResponse { MessageIds = { msgIds } };
6892
}
6993
}
7094

71-
private PublisherClient.Settings MakeSettings(IScheduler scheduler, int batchElementCountThreshold = 1, int batchRequestByteThreshold = 1)
95+
private PublisherClient.Settings MakeSettings(IScheduler scheduler,
96+
int batchElementCountThreshold = 1, int batchRequestByteThreshold = 1, bool enableMessageOrdering = false)
7297
{
7398
return new PublisherClient.Settings
7499
{
75100
Scheduler = scheduler,
76101
BatchingSettings = new BatchingSettings(batchElementCountThreshold, batchRequestByteThreshold, TimeSpan.FromSeconds(10)),
102+
EnableMessageOrdering = enableMessageOrdering
77103
};
78104

79105
}
@@ -143,7 +169,7 @@ public void Shutdown(
143169
var topicName = new TopicName("FakeProject", "FakeTopic");
144170
var scheduler = new TestScheduler();
145171
TaskHelper taskHelper = scheduler.TaskHelper;
146-
var client = new FakePublisherServiceApiClient(scheduler, taskHelper, TimeSpan.FromSeconds(1));
172+
var client = new FakePublisherServiceApiClient(scheduler, taskHelper, delays: new[] { TimeSpan.FromSeconds(1) });
147173
var settings = MakeSettings(scheduler, batchElementCountThreshold: 2, batchRequestByteThreshold: 1000);
148174
int shutdownCount = 0;
149175
var pub = new PublisherClientImpl(topicName, new[] { client }, settings, () =>
@@ -182,5 +208,116 @@ public void SettingsValidation()
182208
new PublisherClient.Settings { BatchingSettings = new BatchingSettings(null, 1, null) }.Validate();
183209
new PublisherClient.Settings { BatchingSettings = new BatchingSettings(null, PublisherClient.ApiMaxBatchingSettings.ByteCountThreshold, null) }.Validate();
184210
}
211+
212+
[Fact]
213+
public void PublishingMessageWithOrderingKeyRequiresOrderingEnabled()
214+
{
215+
var topicName = new TopicName("FakeProject", "FakeTopic");
216+
var scheduler = new TestScheduler();
217+
TaskHelper taskHelper = scheduler.TaskHelper;
218+
var client = new FakePublisherServiceApiClient(scheduler, taskHelper);
219+
var settings = MakeSettings(scheduler);
220+
int shutdownCount = 0;
221+
var pub = new PublisherClientImpl(topicName, new[] { client }, settings, () =>
222+
{
223+
Interlocked.Increment(ref shutdownCount);
224+
return Task.FromResult(0);
225+
}, taskHelper);
226+
scheduler.Run(async () =>
227+
{
228+
await taskHelper.ConfigureAwait(
229+
Assert.ThrowsAsync<InvalidOperationException>(() => pub.PublishAsync("an ordering key", "1")));
230+
});
231+
}
232+
233+
[Theory, PairwiseData]
234+
public void OrderingKeyManyMessages(
235+
[CombinatorialValues(1, 2, 5, 7)] int clientCount,
236+
[CombinatorialValues(1, 2, 6, 13)] int threadCount,
237+
[CombinatorialValues(101, 2000, 9999)] int messageCount,
238+
[CombinatorialValues(1, 2, 9, 51)] int orderingKeysCount,
239+
[CombinatorialValues(1, 5, 50)] int batchElementCountThreshold,
240+
[CombinatorialValues(0, 1, 59, 123, 1001)] int delayMs1,
241+
[CombinatorialValues(0, 2, 500)] int delayMs2)
242+
{
243+
var topicName = new TopicName("FakeProject", "FakeTopic");
244+
var scheduler = new TestScheduler(threadCount);
245+
TaskHelper taskHelper = scheduler.TaskHelper;
246+
var clients = Enumerable.Range(0, clientCount)
247+
.Select(_ => new FakePublisherServiceApiClient(scheduler, taskHelper,
248+
new[] { TimeSpan.FromMilliseconds(delayMs1), TimeSpan.FromMilliseconds(delayMs2) })).ToArray();
249+
var settings = MakeSettings(scheduler,
250+
batchElementCountThreshold: batchElementCountThreshold, batchRequestByteThreshold: 10000, enableMessageOrdering: true);
251+
int shutdownCount = 0;
252+
var pub = new PublisherClientImpl(topicName, clients, settings, () =>
253+
{
254+
Interlocked.Increment(ref shutdownCount);
255+
return Task.FromResult(0);
256+
}, taskHelper);
257+
scheduler.Run(async () =>
258+
{
259+
var tasks = Enumerable.Range(0, messageCount)
260+
.Select(i => pub.PublishAsync((i % orderingKeysCount).ToString(), $"{i % orderingKeysCount}:{i}")).ToArray();
261+
var ids = new HashSet<string>(await taskHelper.ConfigureAwait(taskHelper.WhenAll(tasks)));
262+
await taskHelper.ConfigureAwait(pub.ShutdownAsync(new CancellationToken()));
263+
Assert.Equal(messageCount, ids.Count);
264+
// This doesn't check the global ordering between clients, but that's OK here.
265+
// The emulator-based integration test checks are more thorough.
266+
foreach (var client in clients)
267+
{
268+
var kv = client.HandledMessages.Select(x => x.Split(':')).Select(x => (key: x[0], value: x[1]));
269+
foreach (var values in kv.GroupBy(x => x.key, x => x.value))
270+
{
271+
var errorMsg = $"Ordering-key '{values.Key}' out of order";
272+
foreach (var pair in values.Zip(values.Skip(1), (a, b) => (a, b)))
273+
{
274+
Assert.True(int.Parse(pair.a) < int.Parse(pair.b), errorMsg);
275+
}
276+
}
277+
}
278+
Assert.Equal(ids, new HashSet<string>(clients.SelectMany(x => x.HandledMessages)));
279+
Assert.Equal(1, shutdownCount);
280+
});
281+
}
282+
283+
[Fact]
284+
public void OrderingKeyResumePublish()
285+
{
286+
const string unrecoverableKey = "error-unrecoverable";
287+
const string recoverableKey = "error-recoverable";
288+
var topicName = new TopicName("FakeProject", "FakeTopic");
289+
var scheduler = new TestScheduler();
290+
TaskHelper taskHelper = scheduler.TaskHelper;
291+
var client = new FakePublisherServiceApiClient(scheduler, taskHelper,
292+
orderingKeyErrorUnrecoverable: unrecoverableKey, orderingKeyErrorRecoverable: recoverableKey);
293+
var settings = MakeSettings(scheduler, enableMessageOrdering: true);
294+
int shutdownCount = 0;
295+
var pub = new PublisherClientImpl(topicName, new[] { client }, settings, () =>
296+
{
297+
Interlocked.Increment(ref shutdownCount);
298+
return Task.FromResult(0);
299+
}, taskHelper);
300+
scheduler.Run(async () =>
301+
{
302+
// First call will trigger an unrecoverable error.
303+
var ex = await taskHelper.ConfigureAwait(
304+
Assert.ThrowsAsync<RpcException>(() => pub.PublishAsync(unrecoverableKey, "unrecoverable error")));
305+
Assert.Equal(StatusCode.DataLoss, ex.StatusCode);
306+
// Sending again will reject the message.
307+
await taskHelper.ConfigureAwait(
308+
Assert.ThrowsAsync<OrderingKeyInErrorStateException>(() => pub.PublishAsync(unrecoverableKey, "key in error state")));
309+
// Other ordering-keys publish OK.
310+
await taskHelper.ConfigureAwait(pub.PublishAsync("ok-key", "ok"));
311+
// Including a recoverable error.
312+
await taskHelper.ConfigureAwait(pub.PublishAsync(recoverableKey, "recoverable error"));
313+
// Including a message without an ordering key.
314+
await taskHelper.ConfigureAwait(pub.PublishAsync("key not ordered"));
315+
// Resume publishing on the ordering key.
316+
pub.ResumePublish(unrecoverableKey);
317+
await taskHelper.ConfigureAwait(pub.PublishAsync(unrecoverableKey, "unrecoverable key resumed"));
318+
var expected = new HashSet<string>(new[] { "ok", "key not ordered", "recoverable error", "unrecoverable key resumed" });
319+
Assert.Equal(expected, new HashSet<string>(client.HandledMessages));
320+
});
321+
}
185322
}
186323
}

0 commit comments

Comments
 (0)
X