Skip to content

Commit fc68a48

Browse files
chore(PubSub): Deprecate legacy StopAsync and update docs
1 parent 2fa8091 commit fc68a48

File tree

8 files changed

+40
-14
lines changed

8 files changed

+40
-14
lines changed

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private async Task RunBulkMessagingImpl(
159159
{
160160
// Test finished, so stop subscriber
161161
Console.WriteLine("All msgs received, stopping subscriber.");
162-
Task unused = subscriber.StopAsync(TimeSpan.FromSeconds(15));
162+
Task unused = subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15));
163163
}
164164
}
165165
else
@@ -194,11 +194,11 @@ private async Task RunBulkMessagingImpl(
194194
{
195195
if (noProgressCount > 60)
196196
{
197-
// Deadlock, shutdown subscriber, and cancel
198-
Console.WriteLine("Deadlock detected. Cancelling test");
199-
subscriber.StopAsync(new CancellationToken(true));
200-
watchdogCts.Cancel();
201-
break;
197+
// Deadlock, shutdown subscriber, and cancel
198+
Console.WriteLine("Deadlock detected. Cancelling test");
199+
subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, cancellationToken: new CancellationToken(true));
200+
watchdogCts.Cancel();
201+
break;
202202
}
203203
noProgressCount += 1;
204204
}
@@ -431,7 +431,7 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre
431431
});
432432
await Task.Delay(subscriberLifetime);
433433
Console.WriteLine("Stopping subscriber");
434-
Task stopTask = subscriber.StopAsync(TimeSpan.FromSeconds(15));
434+
Task stopTask = subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15));
435435
// If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail.
436436
await Task.WhenAll(subscribeTask, stopTask);
437437
int recvCount = recvedMsgs.Locked(() => recvedMsgs.Count);
@@ -538,8 +538,8 @@ await subscriberApi.IAMPolicyClient.SetIamPolicyAsync(new SetIamPolicyRequest
538538
{
539539
result.Add((msg.GetDeliveryAttempt(), true));
540540
// Received DLQ message, so stop test.
541-
sub.StopAsync(TimeSpan.FromSeconds(10));
542-
dlqSub.StopAsync(TimeSpan.FromSeconds(10));
541+
sub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(10));
542+
dlqSub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(10));
543543
return Task.FromResult(SubscriberClient.Reply.Ack);
544544
});
545545

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ Task Subscribe()
133133
if (recvCount == inputLines.Count)
134134
{
135135
Console.WriteLine("Received all messages, shutting down");
136-
var dummyTask = sub.StopAsync(CancellationToken.None);
136+
var dummyTask = sub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately);
137137
}
138138
}
139139
if (rnd.Next(3) == 0)

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ await _subscriberClient.StartAsync((msg, token) =>
143143
});
144144

145145
public override async Task StopAsync(CancellationToken stoppingToken) =>
146-
await _subscriberClient.StopAsync(stoppingToken);
146+
await _subscriberClient.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, cancellationToken: stoppingToken);
147147
}
148148
// End sample
149149
}

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ await subscriber.StartAsync((msg, cancellationToken) =>
129129
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
130130
// Stop this subscriber after one message is received.
131131
// This is non-blocking, and the returned Task may be awaited.
132-
subscriber.StopAsync(TimeSpan.FromSeconds(15));
132+
subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15));
133133
// Return Reply.Ack to indicate this message has been handled.
134134
return Task.FromResult(SubscriberClient.Reply.Ack);
135135
});

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,9 @@ public void ImmediateStop_Obsolete(
596596
});
597597
await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None));
598598
var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation(
599+
#pragma warning disable CS0618 // allow use of obsolete method
599600
() => fake.Subscriber.StopAsync(new CancellationToken(hardStop)));
601+
#pragma warning restore CS0618
600602
Assert.Equal(hardStop, isCancelled);
601603
Assert.Equal(1, fake.Subscribers.Count);
602604
Assert.Empty(fake.Subscribers[0].Acks);
@@ -645,7 +647,9 @@ public void StopBeforeStart_Obsolete()
645647
await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None));
646648
var exception = await Assert.ThrowsAsync<InvalidOperationException>(
647649
async () => await fake.TaskHelper.ConfigureAwaitHideCancellation(
650+
#pragma warning disable CS0618 // allow use of obsolete method
648651
() => fake.Subscriber.StopAsync(TimeSpan.FromHours(1))));
652+
#pragma warning restore CS0618
649653
Assert.Equal("Can only stop a started instance.", exception.Message);
650654
Assert.Equal(1, fake.Subscribers.Count);
651655
Assert.Empty(fake.Subscribers[0].Acks);
@@ -705,7 +709,9 @@ await fake.TaskHelper.ConfigureAwaitHideCancellation(
705709
() => fake.Subscriber.DisposeAsync().AsTask());
706710
// Call StopAsync. It shouldn't throw an exception.
707711
await fake.TaskHelper.ConfigureAwaitHideCancellation(
712+
#pragma warning disable CS0618 // allow use of obsolete method
708713
() => fake.Subscriber.StopAsync(CancellationToken.None));
714+
#pragma warning restore CS0618
709715

710716
Assert.Equal(1, fake.Subscribers.Count);
711717
Assert.Empty(fake.Subscribers[0].Acks);
@@ -812,8 +818,10 @@ public void RecvManyMsgsNoErrors_Obsolete(
812818
return SubscriberClient.Reply.Ack;
813819
});
814820
await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None));
821+
#pragma warning disable CS0618 // allow use of obsolete method
815822
var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation(
816823
() => fake.Subscriber.StopAsync(new CancellationToken(hardStop)));
824+
#pragma warning restore CS0618
817825
Assert.Equal(hardStop, isCancelled);
818826
Assert.Equal(clientCount, fake.Subscribers.Count);
819827
Assert.Equal(expectedMsgCount, handledMsgs.Locked(() => handledMsgs.Count));
@@ -986,7 +994,9 @@ public void FlowControl_Obsolete(
986994
return SubscriberClient.Reply.Ack;
987995
});
988996
await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None));
997+
#pragma warning disable CS0618 // allow use of obsolete method
989998
await fake.TaskHelper.ConfigureAwaitHideCancellation(() => fake.Subscriber.StopAsync(new CancellationToken(hardStop)));
999+
#pragma warning restore CS0618
9901000
Assert.Equal(expectedMsgCount, handledMsgs.Count);
9911001
});
9921002
}
@@ -2275,7 +2285,9 @@ public void Shutdown_SoftStop_NacksMessages()
22752285

22762286
await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None));
22772287
Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, givenToMessageHandler, strict: true);
2288+
#pragma warning disable CS0618 // allow use of obsolete method
22782289
await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None));
2290+
#pragma warning restore CS0618
22792291

22802292
// msg0-2 were handled/acked; msg3-5 were pulled but nacked on shutdown.
22812293
Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true);

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ public virtual Task StartAsync(Func<PubsubMessage, CancellationToken, Task<Reply
235235
/// <param name="hardStopToken">Cancel this <see cref="CancellationToken"/> to abort handlers and acknowledgement.</param>
236236
/// <returns>A <see cref="Task"/> that completes when all handled messages have been acknowledged;
237237
/// faults on unrecoverable service errors; or cancels if <paramref name="hardStopToken"/> is cancelled.</returns>
238+
[Obsolete("Use StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken) instead.")]
238239
public virtual Task StopAsync(CancellationToken hardStopToken) => throw new NotImplementedException();
239240

240241
/// <summary>
@@ -247,7 +248,8 @@ public virtual Task StartAsync(Func<PubsubMessage, CancellationToken, Task<Reply
247248
/// <param name="timeout">After this period, abort handling and acknowledging messages.</param>
248249
/// <returns>A <see cref="Task"/> that completes when all handled messages have been acknowledged;
249250
/// faults on unrecoverable service errors; or cancels if <paramref name="timeout"/> expires.</returns>
250-
public virtual Task StopAsync(TimeSpan timeout) => StopAsync(new CancellationTokenSource(timeout).Token);
251+
[Obsolete("Use StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken) instead.")]
252+
public virtual Task StopAsync(TimeSpan timeout) => StopAsync(SubscriberShutdownSetting.WaitForProcessing, timeout);
251253

252254
/// <summary>
253255
/// Stop this <see cref="SubscriberClient"/>.

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,13 @@ public override ValueTask DisposeAsync()
200200
return new ValueTask(Task.CompletedTask);
201201
}
202202
}
203-
return new ValueTask(StopAsync(_disposeTimeout));
203+
#pragma warning disable CS0618 // allow use of obsolete method
204+
return new ValueTask(StopAsync( _disposeTimeout));
205+
#pragma warning restore CS0618
204206
}
205207

206208
/// <inheritdoc />
209+
[Obsolete("Use StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken) instead.")]
207210
public override Task StopAsync(CancellationToken hardStopToken)
208211
{
209212
lock (_lock)

apis/Google.Cloud.PubSub.V1/docs/index.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,15 @@ Below is an example implementation of a console application that utilizes the de
172172

173173
{{sample:SubscriberClient.UseSubscriberServiceInConsoleApp}}
174174

175+
## Subscriber shutdown
176+
177+
When shutting down a `SubscriberClient`, two different shutdown flows are available via the `StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken)` method:
178+
179+
- **NackImmediately**: This immediately stops streaming new messages and then actively sends "Nack" (Negative Acknowledgement) responses for any messages that have been received but have not yet finished being handled. This allows those messages to be quickly redelivered to other active subscribers.
180+
- **WaitForProcessing**: This immediately stops streaming new messages from the server but continues to process all messages that have already been received. If processing does not complete 30s before the specified timeout, the client will switch to NackImmediately to release any remaining messages.
181+
182+
By default, a 1-hour timeout is used for the shutdown process, which can be customized as needed. When the timeout is reached or if the `CancellationToken` is invoked this will trigger an immediate hard stop, aborting all outstanding tasks.
183+
175184
## Disposing of the publisher and subscriber clients
176185

177186
Both `PublisherClient` and `SubscriberClient` implement the `IAsyncDisposable` interface,

0 commit comments

Comments
 (0)