Skip to content

Commit 071ea6f

Browse files
authored
Add sync Create() method to PublisherClient and SubscriberClient (#5895)
1 parent 59e5691 commit 071ea6f

File tree

4 files changed

+90
-5
lines changed

4 files changed

+90
-5
lines changed

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/PublisherClient.cs

+37-3
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,43 @@ internal void Validate()
209209
/// high network bandwidth (e.g. Google Compute Engine instances). If running with more limited network bandwidth, some
210210
/// settings may need changing; especially
211211
/// <see cref="ClientCreationSettings.PublisherServiceApiSettings"/>.<see cref="PublisherServiceApiSettings.PublishSettings"/>.<see cref="CallSettings.Retry"/>.
212+
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
213+
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
212214
/// </summary>
213215
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to.</param>
214216
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
215217
/// <see cref="PublisherServiceApiClient"/>s.</param>
216218
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="PublisherClient"/>.</param>
217219
/// <returns>A <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.</returns>
218-
public static async Task<PublisherClient> CreateAsync(TopicName topicName, ClientCreationSettings clientCreationSettings = null, Settings settings = null)
220+
public static PublisherClient Create(TopicName topicName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
221+
// With isAsync set to false, the returned task will already be completed (either successfully or faulted),
222+
// so .ResultWithUnwrappedExceptions() will always return immediately.
223+
CreateMaybeAsync(topicName, clientCreationSettings, settings, isAsync: false).ResultWithUnwrappedExceptions();
224+
225+
/// <summary>
226+
/// Create a <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.
227+
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
228+
/// high network bandwidth (e.g. Google Compute Engine instances). If running with more limited network bandwidth, some
229+
/// settings may need changing; especially
230+
/// <see cref="ClientCreationSettings.PublisherServiceApiSettings"/>.<see cref="PublisherServiceApiSettings.PublishSettings"/>.<see cref="CallSettings.Retry"/>.
231+
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
232+
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
233+
/// </summary>
234+
/// <param name="topicName">The <see cref="TopicName"/> to publish messages to.</param>
235+
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
236+
/// <see cref="PublisherServiceApiClient"/>s.</param>
237+
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="PublisherClient"/>.</param>
238+
/// <returns>A <see cref="PublisherClient"/> instance associated with the specified <see cref="TopicName"/>.</returns>
239+
public static Task<PublisherClient> CreateAsync(TopicName topicName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
240+
// With isAsync set to true, the returned task will complete asynchronously (if required) as expected.
241+
CreateMaybeAsync(topicName, clientCreationSettings, settings, isAsync: true);
242+
243+
/// <summary>
244+
/// Creates a <see cref="PublisherClient"/>.
245+
/// <paramref name="isAsync"/> controls whether the returned task will complete synchronously or asynchronously, allowing this
246+
/// method to be used by both <see cref="Create"/> and <see cref="CreateAsync"/>.
247+
/// </summary>
248+
private static async Task<PublisherClient> CreateMaybeAsync(TopicName topicName, ClientCreationSettings clientCreationSettings, Settings settings, bool isAsync)
219249
{
220250
clientCreationSettings?.Validate();
221251
// Clone settings, just in case user modifies them and an await happens in this method
@@ -225,7 +255,9 @@ public static async Task<PublisherClient> CreateAsync(TopicName topicName, Clien
225255
// Use default credentials if none given.
226256
if (channelCredentials == null)
227257
{
228-
var credentials = await GoogleCredential.GetApplicationDefaultAsync().ConfigureAwait(false);
258+
var credentials = isAsync ?
259+
(await GoogleCredential.GetApplicationDefaultAsync().ConfigureAwait(false)) :
260+
GoogleCredential.GetApplicationDefault();
229261
if (credentials.IsCreateScopedRequired)
230262
{
231263
credentials = credentials.CreateScoped(PublisherServiceApiClient.DefaultScopes);
@@ -252,7 +284,9 @@ public static async Task<PublisherClient> CreateAsync(TopicName topicName, Clien
252284
Settings = clientCreationSettings?.PublisherServiceApiSettings,
253285
ChannelOptions = grpcChannelOptions
254286
};
255-
var channel = await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false);
287+
var channel = isAsync ?
288+
await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false) :
289+
builder.CreateChannel();
256290

257291
// Second builder doesn't need to do much, as we can build a call invoker from the channel.
258292
clients[i] = new PublisherServiceApiClientBuilder

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/PublisherServiceApiClientBuilder.cs

+14
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,19 @@ internal async Task<ChannelBase> CreateChannelAsync(CancellationToken cancellati
8181
var grpcAdapter = GrpcAdapter ?? DefaultGrpcAdapter;
8282
return grpcAdapter.CreateChannel(endpoint, credentials, effectiveBuilder.GetChannelOptions());
8383
}
84+
85+
/// <summary>
86+
/// Creates a channel for this builder, observing any emulator configuration that has been set.
87+
/// This method is used by PublisherClient, which needs the channel for shutdown purposes.
88+
/// </summary>
89+
internal ChannelBase CreateChannel()
90+
{
91+
// Note: no need to try to detect the channel pool here, as we know we don't want to use it.
92+
var effectiveBuilder = MaybeCreateEmulatorClientBuilder() ?? this;
93+
var endpoint = effectiveBuilder.Endpoint ?? GetDefaultEndpoint();
94+
var credentials = effectiveBuilder.GetChannelCredentials();
95+
var grpcAdapter = GrpcAdapter ?? DefaultGrpcAdapter;
96+
return grpcAdapter.CreateChannel(endpoint, credentials, effectiveBuilder.GetChannelOptions());
97+
}
8498
}
8599
}

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs

+25-2
Original file line numberDiff line numberDiff line change
@@ -279,13 +279,34 @@ internal void Validate()
279279
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
280280
/// high network bandwidth (e.g. Google Compute Engine instances). If running with more limited network bandwidth, some
281281
/// settings may need changing; especially <see cref="Settings.AckDeadline"/>.
282+
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
283+
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
282284
/// </summary>
283285
/// <param name="subscriptionName">The <see cref="SubscriptionName"/> to receive messages from.</param>
284286
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
285287
/// <see cref="SubscriberClient"/>s.</param>
286288
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="SubscriberClient"/>.</param>
287289
/// <returns>A <see cref="SubscriberClient"/> instance associated with the specified <see cref="SubscriptionName"/>.</returns>
288-
public static async Task<SubscriberClient> CreateAsync(SubscriptionName subscriptionName, ClientCreationSettings clientCreationSettings = null, Settings settings = null)
290+
public static SubscriberClient Create(SubscriptionName subscriptionName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
291+
CreateMaybeAsync(subscriptionName, clientCreationSettings, settings, isAsync: false).ResultWithUnwrappedExceptions();
292+
293+
/// <summary>
294+
/// Create a <see cref="SubscriberClient"/> instance associated with the specified <see cref="SubscriptionName"/>.
295+
/// The default <paramref name="settings"/> and <paramref name="clientCreationSettings"/> are suitable for machines with
296+
/// high network bandwidth (e.g. Google Compute Engine instances). If running with more limited network bandwidth, some
297+
/// settings may need changing; especially <see cref="Settings.AckDeadline"/>.
298+
/// By default this method generates a gRPC channel per CPU core; if using a high-core-count machine and using many
299+
/// clients concurrently then this may need reducing; use the setting <see cref="ClientCreationSettings.ClientCount"/>.
300+
/// </summary>
301+
/// <param name="subscriptionName">The <see cref="SubscriptionName"/> to receive messages from.</param>
302+
/// <param name="clientCreationSettings">Optional. <see cref="ClientCreationSettings"/> specifying how to create
303+
/// <see cref="SubscriberClient"/>s.</param>
304+
/// <param name="settings">Optional. <see cref="Settings"/> for creating a <see cref="SubscriberClient"/>.</param>
305+
/// <returns>A <see cref="SubscriberClient"/> instance associated with the specified <see cref="SubscriptionName"/>.</returns>
306+
public static Task<SubscriberClient> CreateAsync(SubscriptionName subscriptionName, ClientCreationSettings clientCreationSettings = null, Settings settings = null) =>
307+
CreateMaybeAsync(subscriptionName, clientCreationSettings, settings, isAsync: true);
308+
309+
private static async Task<SubscriberClient> CreateMaybeAsync(SubscriptionName subscriptionName, ClientCreationSettings clientCreationSettings, Settings settings, bool isAsync)
289310
{
290311
GaxPreconditions.CheckNotNull(subscriptionName, nameof(subscriptionName));
291312
clientCreationSettings?.Validate();
@@ -312,7 +333,9 @@ public static async Task<SubscriberClient> CreateAsync(SubscriptionName subscrip
312333
Settings = clientCreationSettings?.SubscriberServiceApiSettings,
313334
ChannelOptions = grpcChannelOptions
314335
};
315-
var channel = await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false);
336+
var channel = isAsync ?
337+
(await builder.CreateChannelAsync(cancellationToken: default).ConfigureAwait(false)) :
338+
builder.CreateChannel();
316339

317340
// Second builder doesn't need to do much, as we can build a call invoker from the channel.
318341
clients[i] = new SubscriberServiceApiClientBuilder

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberServiceApiClientBuilder.cs

+14
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,19 @@ internal async Task<ChannelBase> CreateChannelAsync(CancellationToken cancellati
8181
var grpcAdapter = GrpcAdapter ?? DefaultGrpcAdapter;
8282
return grpcAdapter.CreateChannel(endpoint, credentials, effectiveBuilder.GetChannelOptions());
8383
}
84+
85+
/// <summary>
86+
/// Creates a channel for this builder, observing any emulator configuration that has been set.
87+
/// This method is used by SubscriberClient, which needs the channel for shutdown purposes.
88+
/// </summary>
89+
internal ChannelBase CreateChannel()
90+
{
91+
// Note: no need to try to detect the channel pool here, as we know we don't want to use it.
92+
var effectiveBuilder = MaybeCreateEmulatorClientBuilder() ?? this;
93+
var endpoint = effectiveBuilder.Endpoint ?? GetDefaultEndpoint();
94+
var credentials = effectiveBuilder.GetChannelCredentials();
95+
var grpcAdapter = GrpcAdapter ?? DefaultGrpcAdapter;
96+
return grpcAdapter.CreateChannel(endpoint, credentials, effectiveBuilder.GetChannelOptions());
97+
}
8498
}
8599
}

0 commit comments

Comments
 (0)
X