diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs index 89c3ca2b303a..8be58a1168b7 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs @@ -159,7 +159,7 @@ private async Task RunBulkMessagingImpl( { // Test finished, so stop subscriber Console.WriteLine("All msgs received, stopping subscriber."); - Task unused = subscriber.StopAsync(TimeSpan.FromSeconds(15)); + Task unused = subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15)); } } else @@ -194,11 +194,11 @@ private async Task RunBulkMessagingImpl( { if (noProgressCount > 60) { - // Deadlock, shutdown subscriber, and cancel - Console.WriteLine("Deadlock detected. Cancelling test"); - subscriber.StopAsync(new CancellationToken(true)); - watchdogCts.Cancel(); - break; + // Deadlock, shutdown subscriber, and cancel + Console.WriteLine("Deadlock detected. Cancelling test"); + subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, cancellationToken: new CancellationToken(true)); + watchdogCts.Cancel(); + break; } noProgressCount += 1; } @@ -431,7 +431,7 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre }); await Task.Delay(subscriberLifetime); Console.WriteLine("Stopping subscriber"); - Task stopTask = subscriber.StopAsync(TimeSpan.FromSeconds(15)); + Task stopTask = subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15)); // If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail. await Task.WhenAll(subscribeTask, stopTask); int recvCount = recvedMsgs.Locked(() => recvedMsgs.Count); @@ -538,8 +538,8 @@ await subscriberApi.IAMPolicyClient.SetIamPolicyAsync(new SetIamPolicyRequest { result.Add((msg.GetDeliveryAttempt(), true)); // Received DLQ message, so stop test. - sub.StopAsync(TimeSpan.FromSeconds(10)); - dlqSub.StopAsync(TimeSpan.FromSeconds(10)); + sub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(10)); + dlqSub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(10)); return Task.FromResult(SubscriberClient.Reply.Ack); }); diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs index 52c4505c29b0..e07727f2daf9 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs @@ -133,7 +133,7 @@ Task Subscribe() if (recvCount == inputLines.Count) { Console.WriteLine("Received all messages, shutting down"); - var dummyTask = sub.StopAsync(CancellationToken.None); + var dummyTask = sub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately); } } if (rnd.Next(3) == 0) diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs index eb8e1fdd814f..203891191b67 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs @@ -143,7 +143,7 @@ await _subscriberClient.StartAsync((msg, token) => }); public override async Task StopAsync(CancellationToken stoppingToken) => - await _subscriberClient.StopAsync(stoppingToken); + await _subscriberClient.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, cancellationToken: stoppingToken); } // End sample } diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs index a69cbc7442c8..22828e5fa766 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs @@ -129,7 +129,7 @@ await subscriber.StartAsync((msg, cancellationToken) => Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'"); // Stop this subscriber after one message is received. // This is non-blocking, and the returned Task may be awaited. - subscriber.StopAsync(TimeSpan.FromSeconds(15)); + subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15)); // Return Reply.Ack to indicate this message has been handled. return Task.FromResult(SubscriberClient.Reply.Ack); }); diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs index 72d839098982..4733bf01b9ac 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs @@ -23,10 +23,10 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Xunit; +using SubscriberShutdownSetting = Google.Cloud.PubSub.V1.SubscriberClient.SubscriberShutdownSetting; namespace Google.Cloud.PubSub.V1.Tests { @@ -583,7 +583,7 @@ private static RpcException GetExactlyOnceDeliveryMixedException(Rpc.ErrorInfo e } [Theory, CombinatorialData] - public void ImmediateStop( + public void ImmediateStop_Obsolete( [CombinatorialValues(false, true)] bool hardStop) { using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) @@ -596,7 +596,36 @@ public void ImmediateStop( }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); +#pragma warning restore CS0618 + Assert.Equal(hardStop, isCancelled); + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.Subscribers[0].WriteCompletes); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(3) }, fake.ClientShutdowns); + }); + } + } + + [Theory, CombinatorialData] + public void ImmediateStop( + [CombinatorialValues(false, true)] bool hardStop, + SubscriberShutdownSetting shutdownSetting) + { + using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) + { + fake.Scheduler.Run(async () => + { + var doneTask = fake.Subscriber.StartAsync((msg, ct) => + { + throw new Exception("Should never get here"); + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( + () => fake.Subscriber.StopAsync(shutdownSetting, cancellationToken: new CancellationToken(hardStop))); Assert.Equal(hardStop, isCancelled); Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); @@ -609,7 +638,7 @@ public void ImmediateStop( } [Fact] - public void StopBeforeStart() + public void StopBeforeStart_Obsolete() { using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) { @@ -618,7 +647,35 @@ public void StopBeforeStart() await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); var exception = await Assert.ThrowsAsync( async () => await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(TimeSpan.FromHours(1)))); +#pragma warning restore CS0618 + Assert.Equal("Can only stop a started instance.", exception.Message); + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(Array.Empty(), fake.Subscribers[0].WriteCompletes); + Assert.Equal(Array.Empty(), fake.ClientShutdowns); + }); + } + } + + [Theory, CombinatorialData] + public void StopBeforeStart(SubscriberShutdownSetting shutdownSetting) + { + // Ensure StopAsync cannot be called before the subscriber has started. + using (var fake = Fake.Create(new[] { new[] { ServerAction.Inf() } })) + { + fake.Scheduler.Run(async () => + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + + // Attempting to stop an unstarted subscriber should throw InvalidOperationException. + var exception = await Assert.ThrowsAsync( + async () => await fake.TaskHelper.ConfigureAwait( + fake.Subscriber.StopAsync(shutdownSetting, TimeSpan.FromHours(1)))); + Assert.Equal("Can only stop a started instance.", exception.Message); Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); @@ -633,7 +690,7 @@ public void StopBeforeStart() // The test is similar to ImmediateStop but checks that calling DisposeAsync() instead of StopAsync() works. // It also tests that DisposeAsync() or StopAsync() can be called multiple times, without throwing exception. [Fact] - public void Dispose() + public void Dispose_Obsolete() { using (var fake = Fake.CreateClientForSingleResponseStream(new[] { ServerAction.Inf() })) { @@ -652,8 +709,41 @@ await fake.TaskHelper.ConfigureAwaitHideCancellation( () => fake.Subscriber.DisposeAsync().AsTask()); // Call StopAsync. It shouldn't throw an exception. await fake.TaskHelper.ConfigureAwaitHideCancellation( +#pragma warning disable CS0618 // allow use of obsolete method () => fake.Subscriber.StopAsync(CancellationToken.None)); +#pragma warning restore CS0618 + + Assert.Equal(1, fake.Subscribers.Count); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + Assert.Empty(fake.Subscribers[0].Extends); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(1) }, fake.Subscribers[0].WriteCompletes); + Assert.Equal(new[] { fake.Time0 + TimeSpan.FromSeconds(3) }, fake.ClientShutdowns); + }); + } + } + + [Theory, CombinatorialData] + public void Dispose(SubscriberShutdownSetting shutdownSetting) + { + // Ensure StopAsync is idempotent and safe to call after disposal. + using (var fake = Fake.CreateClientForSingleResponseStream(new[] { ServerAction.Inf() })) + { + fake.Scheduler.Run(async () => + { + var doneTask = fake.Subscriber.StartAsync((msg, ct) => + { + throw new Exception("Should never get here"); + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + + // Perform disposal then multiple StopAsync calls. + await fake.TaskHelper.ConfigureAwaitHideCancellation( + () => fake.Subscriber.DisposeAsync().AsTask()); + await fake.TaskHelper.ConfigureAwait( + fake.Subscriber.StopAsync(shutdownSetting, TimeSpan.FromHours(1))); + // Verify the client shutdown correctly without exceptions. Assert.Equal(1, fake.Subscribers.Count); Assert.Empty(fake.Subscribers[0].Acks); Assert.Empty(fake.Subscribers[0].Nacks); @@ -690,7 +780,7 @@ await fake.TaskHelper.ConfigureAwaitHideCancellation( } [Theory, PairwiseData] - public void RecvManyMsgsNoErrors( + public void RecvManyMsgsNoErrors_Obsolete( [CombinatorialValues(false, true)] bool hardStop, [CombinatorialValues(2, 5, 6, 9, 10)] int batchCount, [CombinatorialValues(1, 10, 13, 44, 45)] int batchSize, @@ -728,8 +818,70 @@ public void RecvManyMsgsNoErrors( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); +#pragma warning disable CS0618 // allow use of obsolete method var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( () => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); +#pragma warning restore CS0618 + Assert.Equal(hardStop, isCancelled); + Assert.Equal(clientCount, fake.Subscribers.Count); + Assert.Equal(expectedMsgCount, handledMsgs.Locked(() => handledMsgs.Count)); + Assert.Equal(expectedMsgCount, fake.Subscribers.Sum(x => x.Acks.Count)); + Assert.Equal(Enumerable.Repeat(expectedAcks, clientCount), fake.Subscribers.Select(x => x.Acks.Select(y => y.Id).OrderBy(y => y))); + Assert.Equal(Enumerable.Repeat(1, clientCount).ToArray(), fake.Subscribers.Select(x => x.WriteCompletes.Count).ToArray()); + Assert.Equal(1, fake.ClientShutdowns.Count); + }); + } + } + + [Theory, PairwiseData] + public void RecvManyMsgsNoErrors( + bool hardStop, + SubscriberShutdownSetting shutdownSetting, + [CombinatorialValues(2, 5, 6, 9, 10)] int batchCount, + [CombinatorialValues(1, 10, 13, 44, 45)] int batchSize, + [CombinatorialValues(0, 1, 4, 6, 60)] int interBatchIntervalSeconds, + [CombinatorialValues(0, 1, 5, 8, 21)] int handlerDelaySeconds, + [CombinatorialValues(2, 8, 11, 34, 102)] int stopAfterSeconds, + [CombinatorialValues(1, 2, 3, 4, 5, 7, 13)] int threadCount, + [CombinatorialValues(1, 2, 3, 4, 8, 16, 33)] int clientCount) + { + int delayToSubtract = shutdownSetting switch + { + SubscriberShutdownSetting.WaitForProcessing => hardStop ? handlerDelaySeconds : 0, + SubscriberShutdownSetting.NackImmediately => handlerDelaySeconds, + _ => throw new InvalidOperationException() + }; + + var expectedCompletedBatches = interBatchIntervalSeconds == 0 + ? (stopAfterSeconds < delayToSubtract ? 0 : batchCount) + : Math.Max(0, stopAfterSeconds - delayToSubtract) / interBatchIntervalSeconds; + var expectedMsgCount = Math.Min(expectedCompletedBatches, batchCount) * batchSize * clientCount; + var expectedAcks = Enumerable.Range(0, batchCount) + .SelectMany(batchIndex => Enumerable.Range(0, batchSize).Select(msgIndex => FakeSubscriberServiceApiClient.MakeMsgId(batchIndex, msgIndex))) + .Take(expectedMsgCount / clientCount) + .OrderBy(x => x); + + var msgss = Enumerable.Range(0, batchCount) + .Select(batchIndex => + ServerAction.Data(TimeSpan.FromSeconds(interBatchIntervalSeconds), Enumerable.Range(0, batchSize).Select(i => (batchIndex * batchSize + i).ToString()))) + .Concat(new[] { ServerAction.Inf() }); + using (var fake = Fake.Create(Enumerable.Repeat(msgss, clientCount).ToList(), clientCount: clientCount, threadCount: threadCount)) + { + fake.Scheduler.Run(async () => + { + List handledMsgs = new List(); + var doneTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(handlerDelaySeconds), ct)); + lock (handledMsgs) + { + handledMsgs.Add(msg.Data.ToStringUtf8()); + } + return SubscriberClient.Reply.Ack; + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); + var isCancelled = await fake.TaskHelper.ConfigureAwaitHideCancellation( + () => fake.Subscriber.StopAsync(shutdownSetting, cancellationToken: new CancellationToken(hardStop))); Assert.Equal(hardStop, isCancelled); Assert.Equal(clientCount, fake.Subscribers.Count); Assert.Equal(expectedMsgCount, handledMsgs.Locked(() => handledMsgs.Count)); @@ -783,7 +935,7 @@ public void OneClientManyMsgs([CombinatorialValues(1, 2, 3, 4, 5)] int rndSeed) recvedMsgs.Add(msgString); if (recvedMsgs.Count == totalMsgCount) { - Task unused = fake.Subscriber.StopAsync(CancellationToken.None); + Task unused = fake.Subscriber.StopAsync(SubscriberShutdownSetting.WaitForProcessing); } } return SubscriberClient.Reply.Ack; @@ -799,7 +951,7 @@ public void OneClientManyMsgs([CombinatorialValues(1, 2, 3, 4, 5)] int rndSeed) } [Theory, PairwiseData] - public void FlowControl( + public void FlowControl_Obsolete( [CombinatorialValues(false, true)] bool hardStop, [CombinatorialValues(1, 2, 3)] int clientCount, [CombinatorialValues(2, 9, 25, 600)] int stopAfterSeconds, @@ -842,14 +994,66 @@ public void FlowControl( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); +#pragma warning disable CS0618 // allow use of obsolete method await fake.TaskHelper.ConfigureAwaitHideCancellation(() => fake.Subscriber.StopAsync(new CancellationToken(hardStop))); +#pragma warning restore CS0618 Assert.Equal(expectedMsgCount, handledMsgs.Count); }); } } - [Fact] - public void UserHandlerFaults() + [Theory, PairwiseData] + public void FlowControl( + [CombinatorialValues(false, true)] bool hardStop, + [CombinatorialValues(1, 2, 3)] int clientCount, + [CombinatorialValues(2, 9, 25, 600)] int stopAfterSeconds, + [CombinatorialValues(1, 2, 3, 4, 5, 10, 21, 99, 148)] int flowMaxElements, + [CombinatorialValues(1, 10, 14, 18, 25, 39, 81, 255)] int flowMaxBytes, + [CombinatorialValues(1, 3, 9, 19)] int threadCount) + { + const int msgsPerClient = 100; + var oneMsgByteCount = FakeSubscriberServiceApiClient.MakeReceivedMessage("0000.0000", "0000").CalculateSize(); + var combinedFlowMaxElements = Math.Min(flowMaxElements, flowMaxBytes / oneMsgByteCount + 1); + var expectedMsgCount = Math.Min(msgsPerClient * clientCount, combinedFlowMaxElements * stopAfterSeconds); + var msgss = Enumerable.Range(0, msgsPerClient) + .Select(i => ServerAction.Data(TimeSpan.Zero, new[] { i.ToString("D4") })) + .Concat(new[] { ServerAction.Inf() }); + using (var fake = Fake.Create(Enumerable.Repeat(msgss, clientCount).ToList(), + flowMaxElements: flowMaxElements, flowMaxBytes: flowMaxBytes, clientCount: clientCount, threadCount: threadCount)) + { + fake.Scheduler.Run(async () => + { + List handledMsgs = new List(); + int concurrentElementCount = 0; + int concurrentByteCount = 0; + var doneTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var msgSize = msg.CalculateSize(); + lock (handledMsgs) + { + Assert.True((concurrentElementCount += 1) <= flowMaxElements, "Flow has exceeded max elements."); + // Exceeding the flow byte limit is allowed for individual messages that exceed that size. + Assert.True((concurrentByteCount += msgSize) <= flowMaxBytes || concurrentElementCount == 1, "Flow has exceeded max bytes."); + } + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), ct)); + lock (handledMsgs) + { + handledMsgs.Add(msg.Data.ToStringUtf8()); + // Check just for sanity + Assert.True((concurrentElementCount -= 1) >= 0); + Assert.True((concurrentByteCount -= msgSize) >= 0); + } + return SubscriberClient.Reply.Ack; + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(stopAfterSeconds) + TimeSpan.FromSeconds(0.5), CancellationToken.None)); + await fake.TaskHelper.ConfigureAwaitHideCancellation(() => fake.Subscriber.StopAsync(SubscriberShutdownSetting.NackImmediately, cancellationToken: new CancellationToken(hardStop))); + Assert.Equal(expectedMsgCount, handledMsgs.Count); + }); + } + } + + [Theory, CombinatorialData] + public void UserHandlerFaults(SubscriberShutdownSetting shutdownSetting) { var msgs = Enumerable.Repeat(ServerAction.Data(TimeSpan.Zero, new[] { "m" }), 10).Concat(new[] { ServerAction.Inf() }); using (var fake = Fake.Create(new[] { msgs })) @@ -869,7 +1073,7 @@ public void UserHandlerFaults() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); Assert.Equal(Enumerable.Repeat("m", 5), handledMsgs); Assert.Equal(5, fake.Subscribers[0].Acks.Count); Assert.Equal(5, fake.Subscribers[0].Nacks.Count); @@ -879,7 +1083,8 @@ public void UserHandlerFaults() [Theory, PairwiseData] public void ServerFaultsRecoverable( - [CombinatorialValues(1, 3, 9, 14)] int threadCount) + [CombinatorialValues(1, 3, 9, 14)] int threadCount, + SubscriberShutdownSetting shutdownSetting) { var zero = TimeSpan.Zero; var recoverableEx = new RpcException(new Status(StatusCode.DeadlineExceeded, ""), ""); @@ -901,7 +1106,7 @@ public void ServerFaultsRecoverable( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); Assert.Equal(new[] { "1", "2", "3" }, handledMsgs); Assert.Equal(3, fake.Subscribers[0].Acks.Count); }); @@ -912,7 +1117,8 @@ public void ServerFaultsRecoverable( public void ServerFaultsUnrecoverable( [CombinatorialValues(true, false)] bool badMoveNext, [CombinatorialValues(1, 2, 3, 4, 10)] int clientCount, - [CombinatorialValues(1, 3, 9, 14)] int threadCount) + [CombinatorialValues(1, 3, 9, 14)] int threadCount, + SubscriberShutdownSetting shutdownSetting) { var zero = TimeSpan.Zero; var unrecoverableEx = new RpcException(new Status(StatusCode.Unimplemented, ""), ""); @@ -936,7 +1142,7 @@ public void ServerFaultsUnrecoverable( return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(shutdownSetting)); Assert.Equal(unrecoverableEx, ex.AllExceptions().FirstOrDefault()); Assert.NotEmpty(handledMsgs); Assert.True(handledMsgs[0] == "1" || handledMsgs[0] == "2"); @@ -945,6 +1151,39 @@ public void ServerFaultsUnrecoverable( } } + [Theory, CombinatorialData] + public void Shutdown_ServerFaultsUnrecoverable(SubscriberShutdownSetting shutdownSetting) + { + // Ensure StopAsync propagates unrecoverable server errors. + var zero = TimeSpan.Zero; + var unrecoverableEx = new RpcException(new Status(StatusCode.Unimplemented, ""), ""); + var failure = ServerAction.BadMoveNext(zero, unrecoverableEx); + var msgs = new[] { new[] { ServerAction.Data(zero, new[] { "1", "2" }), failure } }; + + using (var fake = Fake.Create(msgs, flowMaxElements: 1)) + { + fake.Scheduler.Run(async () => + { + var handledMsgs = new List(); + var doneTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + handledMsgs.Locked(() => handledMsgs.Add(msg.Data.ToStringUtf8())); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), ct)); + return SubscriberClient.Reply.Ack; + }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); + + // Stop the subscriber after a server failure has occurred. + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(shutdownSetting)); + + // The resulting task should fault with the original server exception. + Assert.Equal(unrecoverableEx, ex.AllExceptions().FirstOrDefault()); + Assert.NotEmpty(handledMsgs); + Assert.Equal(1, fake.ClientShutdowns.Count); + }); + } + } + [Fact] public void OnlyOneStart() { @@ -965,7 +1204,7 @@ public void OnlyOneStart() } [Theory, CombinatorialData] - public void LeaseExtension(bool isExactlyOnceDelivery) + public void LeaseExtension(bool isExactlyOnceDelivery, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { new[] { ServerAction.Data(TimeSpan.Zero, new[] { "1" }), @@ -982,7 +1221,7 @@ public void LeaseExtension(bool isExactlyOnceDelivery) return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); await fake.TaskHelper.ConfigureAwait(doneTask); Assert.Equal(1, fake.Subscribers.Count); DateTime S(int seconds) => fake.Time0 + TimeSpan.FromSeconds(seconds); @@ -993,7 +1232,7 @@ public void LeaseExtension(bool isExactlyOnceDelivery) } [Theory, CombinatorialData] - public void LeaseMaxExtension(bool isExactlyOnceDelivery) + public void LeaseMaxExtension(bool isExactlyOnceDelivery, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { new[] { ServerAction.Data(TimeSpan.Zero, new[] { "1" }), @@ -1010,7 +1249,7 @@ public void LeaseMaxExtension(bool isExactlyOnceDelivery) return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromHours(12), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting, TimeSpan.FromHours(24))); await fake.TaskHelper.ConfigureAwait(doneTask); Assert.Equal(1, fake.Subscribers.Count); // Check that the lease was extended for 60 minutes only. @@ -1020,8 +1259,8 @@ public void LeaseMaxExtension(bool isExactlyOnceDelivery) } } - [Fact] - public void SlowUplinkThrottlesPull() + [Theory, CombinatorialData] + public void SlowUplinkThrottlesPull(SubscriberShutdownSetting shutdownSetting) { const int msgCount = 20; const int flowMaxEls = 5; @@ -1037,7 +1276,7 @@ public void SlowUplinkThrottlesPull() { var subTask = fake.Subscriber.StartAsync((msg, ct) => Task.FromResult(SubscriberClient.Reply.Ack)); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1000), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); await fake.TaskHelper.ConfigureAwait(subTask); var sub = fake.Subscribers[0]; Assert.True(sub.Extends.Count <= msgCount); // Difficult to predict, must be <= total message count @@ -1050,8 +1289,8 @@ public void SlowUplinkThrottlesPull() } } - [Fact] - public void StreamPings() + [Theory, CombinatorialData] + public void StreamPings(SubscriberShutdownSetting shutdownSetting) { const int pingPeriodSeconds = 25; // From SubscriberClient. const int pingCount = 10; @@ -1070,7 +1309,7 @@ public void StreamPings() // Wait a bit longer, to check no more pings happen. await th.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(pingPeriodSeconds * 4), CancellationToken.None)); // Stop subscriber. - await th.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await th.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); await th.ConfigureAwait(subTask); var expectedPings = Enumerable.Range(0, pingCount).Select(i => fake.Time0 + TimeSpan.FromSeconds(pingPeriodSeconds * (i + 1))); Assert.Equal(expectedPings, fake.Subscribers[0].StreamPings); @@ -1099,14 +1338,14 @@ public void OrderingKeysManyMsgs( var startTask = fake.Subscriber.StartAsync(async (msg, ct) => { var delay = TimeSpan.FromMilliseconds(rnd.Next(1000)); - await th.ConfigureAwait(fake.Scheduler.Delay(delay, default)); + await th.ConfigureAwait(fake.Scheduler.Delay(delay, ct)); lock (recvedMsgs) { recvedMsgs.Add(msg.Data.ToStringUtf8()); recvCount += 1; if (recvCount == msgCount) { - var dummyTask = fake.Subscriber.StopAsync(CancellationToken.None); + var dummyTask = fake.Subscriber.StopAsync(SubscriberShutdownSetting.WaitForProcessing, TimeSpan.FromHours(24)); } } return SubscriberClient.Reply.Ack; @@ -1296,8 +1535,8 @@ public void InvalidParameters() //Assert.Equal("MaxTotalAckExtension", ex9.ParamName); There's a bug in GaxPreconditions.CheckNonNegativeDelay() which uses the wrong paramName } - [Fact] - public void DeliveryAttempt() + [Theory, CombinatorialData] + public void DeliveryAttempt(SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { ServerAction.Data(TimeSpan.Zero, new[] { "m" }, deliveryAttempt: null), @@ -1315,7 +1554,7 @@ public void DeliveryAttempt() return Task.FromResult(SubscriberClient.Reply.Ack); }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); Assert.Equal(new int?[] { null, 2 }, deliveryAttempts); }); } @@ -1323,7 +1562,7 @@ public void DeliveryAttempt() // Acknowledge / ModifyAcknowledgeDeadline calls may throw RpcException. RpcExceptions should not be thrown to the client. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false, null)] bool? ackOrModifyAck) + public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false, null)] bool? ackOrModifyAck, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1356,7 +1595,7 @@ public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); // Despite RpcException being thrown, all 4 messages should be handled. Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); }); @@ -1364,7 +1603,7 @@ public void AckModifyAckDeadlineFault_NotThrown([CombinatorialValues(true, false // If Acknowledge / ModifyAcknowledgeDeadline calls throw exceptions other than RpcExceptions, they should be thrown to the client. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] bool ackOrModifyAck) + public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] bool ackOrModifyAck, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1394,7 +1633,7 @@ public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(shutdownSetting)); Assert.Equal(exception, ex.AllExceptions().FirstOrDefault()); Assert.Equal(1, fake.ClientShutdowns.Count); }); @@ -1404,7 +1643,7 @@ public void AckModifyAckDeadlineFault_Thrown([CombinatorialValues(true, false)] // In non exactly once delivery, if we use the new SubscriptionHandler to see Ack/NackResponse, // the acknowledgement status should be returned as Success, they are treated as "fire and forget" operations. [Theory, CombinatorialData] - public void AckModifyAckDeadlineFault_SubscriptionHandler([CombinatorialValues(true, false)] bool ackOrModifyAck) + public void AckModifyAckDeadlineFault_SubscriptionHandler(bool ackOrModifyAck, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1429,14 +1668,14 @@ public void AckModifyAckDeadlineFault_SubscriptionHandler([CombinatorialValues(t { var doneTask = fake.Subscriber.StartAsync(testSubscriptionHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); // All the 4 test messages have encountered a recoverable RpcException, but their status should be success. Assert.Equal(4, testSubscriptionHandler.Responses.Count(j => j.Status == AcknowledgementStatus.Success)); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, null)] bool? ackNackOrExtends) + public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, null)] bool? ackNackOrExtends, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1473,14 +1712,14 @@ public void ExactlyOnceDelivery_TemporaryFault([CombinatorialValues(true, false, return ackNackOrExtends == false ? SubscriberClient.Reply.Nack : SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); // Despite temporary failures, all 4 messages should be handled. Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true, false)] bool ackOrNack) + public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true, false)] bool ackOrNack, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1504,15 +1743,15 @@ public void ExactlyOnceDelivery_AckNack_PermanentFault([CombinatorialValues(true { var doneTask = fake.Subscriber.StartAsync(testSubscriptionHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(shutdownSetting)); // Exception should not be thrown. Assert.Null(ex); Assert.Equal(new[] { "1", "2", "3", "4" }, testSubscriptionHandler.Responses.Where(j => j.Status == AcknowledgementStatus.FailedPrecondition).Select(j => j.MessageId)); }); } - [Fact] - public void ExactlyOnceDelivery_Extends_PermanentFault() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_Extends_PermanentFault(SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1538,14 +1777,14 @@ public void ExactlyOnceDelivery_Extends_PermanentFault() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(shutdownSetting)); // Exception shouldn't be thrown in case of permanent failure. Assert.Null(ex); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, false)] bool ackOrNack) + public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, false)] bool ackOrNack, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1573,7 +1812,7 @@ public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, fa { var doneTask = fake.Subscriber.StartAsync(testHandler); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(shutdownSetting)); Assert.Null(ex); // "1" is success and "3" is permanent failure. Assert.Equal("1", testHandler.Responses.First(j => j.Status == AcknowledgementStatus.Success).MessageId); @@ -1581,8 +1820,8 @@ public void ExactlyOnceDelivery_AckNack_MixedFault([CombinatorialValues(true, fa }); } - [Fact] - public void ExactlyOnceDelivery_Extends_MixedFault() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_Extends_MixedFault(SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1609,7 +1848,7 @@ public void ExactlyOnceDelivery_Extends_MixedFault() return SubscriberClient.Reply.Ack; }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(CancellationToken.None)); + Exception ex = await fake.TaskHelper.ConfigureAwaitHideErrors(() => fake.Subscriber.StopAsync(shutdownSetting)); // Permanent exception shouldn't be thrown. // Extends are not user initiated, so we can't get the success and temporary failed status from the client. Assert.Null(ex); @@ -1617,8 +1856,8 @@ public void ExactlyOnceDelivery_Extends_MixedFault() } // All successful receipt ModAcks. - [Fact] - public void ExactlyOnceDelivery_ReceiptModAck() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_ReceiptModAck(SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1645,7 +1884,7 @@ public void ExactlyOnceDelivery_ReceiptModAck() }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); // All 4 messages are handled. Assert.Equal(4, handledMsgs.Count); Assert.Equal(new[] { "1", "2", "3", "4" }, handledMsgs); @@ -1653,7 +1892,7 @@ public void ExactlyOnceDelivery_ReceiptModAck() } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(true, false)] bool succeedOnRetry) + public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(true, false)] bool succeedOnRetry, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1685,15 +1924,15 @@ public void ExactlyOnceDelivery_ReceiptModAck_MixedFault([CombinatorialValues(tr }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); // Permanently failed receipt ModAcks won't be passed to the user handler, so 3 is not handled. // Temporary failed ModAck for message 2 becomes successful or permanent failure based on succeedOnRetry flag. Assert.Equal(succeedOnRetry ? new[] { "1", "2", "4" } : new[] { "1", "4" }, handledMsgs); }); } - [Fact] - public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults() + [Theory, CombinatorialData] + public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults(SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1736,14 +1975,14 @@ public void ExactlyOnceDelivery_ReceiptModAck_PermanentFaults() }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); // Permanently failed receipt ModAcks won't be passed to the user handler, so all 4 messages are not handled. Assert.Equal(0, handledMsgs.Count); }); } [Theory, CombinatorialData] - public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValues(true, false)] bool succeedOnRetry) + public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValues(true, false)] bool succeedOnRetry, SubscriberShutdownSetting shutdownSetting) { var msgs = new[] { @@ -1787,7 +2026,7 @@ public void ExactlyOnceDelivery_ReceiptModAck_TemporaryFaults([CombinatorialValu }); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); // Temporary failed receipt ModAcks can succeed after 1 retry or stay failed, so based on the succeedOnRetry flag, 4 or 0 messages are handled. Assert.Equal(succeedOnRetry ? 4 : 0, handledMsgs.Count); Assert.Equal(succeedOnRetry ? new[] { "1", "2", "3", "4" } : Array.Empty(), handledMsgs); @@ -1914,8 +2153,8 @@ public void StreamingPullRetry_NonRetriableException() }); } - [Fact] - public void StreamingPullRetry_InternalErrorContinuesRetrying() + [Theory, CombinatorialData] + public void StreamingPullRetry_InternalErrorContinuesRetrying(SubscriberShutdownSetting shutdownSetting) { // A regular internal failure that's not due to an auth error. var exception = new RpcException(new Status(StatusCode.Internal, "Bang")); @@ -1927,7 +2166,7 @@ public void StreamingPullRetry_InternalErrorContinuesRetrying() var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); await subscriberTask; }); } @@ -1954,8 +2193,8 @@ public void StreamingPullRetry_RetriableErrorEventuallyFails() /// If the streaming pull call fails in MoveNext after a short time (e.g. 10 seconds) /// we should retry with backoff. /// - [Fact] - public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackoff() + [Theory, CombinatorialData] + public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackoff(SubscriberShutdownSetting shutdownSetting) { var exception = new RpcException(new Status(StatusCode.Unavailable, "Stream terminated")); TimeSpan streamDuration = TimeSpan.FromSeconds(30); @@ -1967,7 +2206,7 @@ public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackof var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromMinutes(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); await subscriberTask; // Check the pull times indicate a backoff. @@ -1993,8 +2232,8 @@ public void StreamingPullRetry_UnavailableAfterShortDelayTriggersRetryWithBackof /// We *expect* the streaming pull to fail (in MoveNext) after about a minute... we should /// retry immediately each time. /// - [Fact] - public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBackoff() + [Theory, CombinatorialData] + public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBackoff(SubscriberShutdownSetting shutdownSetting) { var exception = new RpcException(new Status(StatusCode.Unavailable, "Stream terminated")); TimeSpan streamDuration = TimeSpan.FromSeconds(60); @@ -2007,7 +2246,7 @@ public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBack var subscriberTask = fake.Subscriber.StartAsync((msg, ct) => throw new Exception("No messages should be provided")); await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromMinutes(100), CancellationToken.None)); Assert.False(subscriberTask.IsCompleted); - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(shutdownSetting)); await subscriberTask; // Check the pull times indicate no backoff. @@ -2018,48 +2257,271 @@ public void StreamingPullRetry_UnavailableAfterLongDelayTriggersRetryWithoutBack } [Fact] - public void NackMessagesOnShutdown() + public void Shutdown_SoftStop_NacksMessages() { - var msgs = new[] { new[] { + var msgs = new[] { ServerAction.Data(TimeSpan.Zero, ["msg0"]), ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2", "msg3"]), - ServerAction.Data(TimeSpan.Zero, ["msg4", "msg5"]), + ServerAction.Data(TimeSpan.Zero, ["msg4", "msg5", "msg6"]), + ServerAction.Data(TimeSpan.Zero, ["msg7"]), + ServerAction.Inf() + }; + // flowMaxElements=2: msg0 and msg2 block flow control; msg1 is processed quickly. + // msg3-5 wait for flow control and should be Nacked on shutdown. + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var givenToMessageHandler = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + givenToMessageHandler.Locked(() => givenToMessageHandler.Add(data)); + if (data == "msg0" || data == "msg4") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromMinutes(1), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, givenToMessageHandler, strict: true); +#pragma warning disable CS0618 // allow use of obsolete method + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); +#pragma warning restore CS0618 + + // msg0-2 were handled/acked; msg3-5 were pulled but nacked on shutdown. + Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg5", "msg6" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_NackImmediately_Success() + { + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + // flowMaxElements=2: msg1 and msg2 block flow control. + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data == "msg1" || data == "msg2") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(30), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // NackImmediately: Nacks pending messages (msg3, msg4) and messages currently being handled (msg1, msg2). + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(60))); + + Assert.Equivalent(new[] { "msg0" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_CompletesBeforeNack() + { + // Ensure WaitForProcessing allows all received messages to finish if time permits. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data == "msg1" || data == "msg2") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // Stop and wait for processing with a generous timeout. + // It waits for msg1, msg2 to finish and allows msg3, msg4 to be processed. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(SubscriberShutdownSetting.WaitForProcessing, TimeSpan.FromSeconds(60))); + + // All messages should be Acked and none Nacked. + Assert.Equivalent(new[] { "msg0", "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Empty(fake.Subscribers[0].Nacks); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_NacksOnTimeout() + { + // Verify that remaining messages are Nacked if the WaitForProcessing timeout is reached. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2", "msg3", "msg4"]), + ServerAction.Data(TimeSpan.Zero, ["msg5", "msg6"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var givenToMessageHandler = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + givenToMessageHandler.Locked(() => givenToMessageHandler.Add(data)); + if (data != "msg0") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, givenToMessageHandler, strict: true); + + // Stop with a timeout that expires before processing completes. + // Timeout=45s: NackDelay=15s. At T=5+15=20s, NackImmediately is triggered. + // msg1, msg2 finish at T=20s. msg3, msg4 start at T=20s but are Nacked as shutdown is in progress. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(SubscriberShutdownSetting.WaitForProcessing, TimeSpan.FromSeconds(50))); + + // Verify the switch to Nacking for the remaining messages. + Console.WriteLine($"Acks: {string.Join(", ", fake.Subscribers[0].Acks.Select(x => x.Id))}"); + Console.WriteLine($"Nacks: {string.Join(", ", fake.Subscribers[0].Nacks.Select(x => x.Id))}"); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_CancellationToken_AbortsWaitForProcessing() + { + // Verify that cancelling the CancellationToken passed to StopAsync + // immediately triggers a Hard Stop, aborting any graceful shutdown. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var givenToMessageHandler = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + givenToMessageHandler.Locked(() => givenToMessageHandler.Add(msg.Data.ToStringUtf8())); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(60), ct)); + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + Assert.Single(givenToMessageHandler); + + // Request graceful shutdown with a long timeout. + var cts = new CancellationTokenSource(); + var stopTask = fake.Subscriber.StopAsync(SubscriberShutdownSetting.WaitForProcessing, TimeSpan.FromHours(1), cts.Token); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + + // Cancel the token after 5 seconds of "graceful" shutdown. + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + cts.Cancel(); + + await fake.TaskHelper.ConfigureAwaitHideCancellation(() => stopTask); + + // No ack response will be provided, all work should be dropped as we didn't have a chance to switch to + // NackImmediately + Assert.Empty(fake.Subscribers[0].Acks); + Assert.Empty(fake.Subscribers[0].Nacks); + }); + } + + [Fact] + public void Shutdown_WaitForProcessing_NacksWhenTimeoutLessThanMinimum() + { + // Ensure immediate Nacking if the requested timeout is shorter than the 30s grace period. + var msgs = new[] { + ServerAction.Data(TimeSpan.Zero, ["msg0"]), + ServerAction.Data(TimeSpan.Zero, ["msg1", "msg2"]), + ServerAction.Data(TimeSpan.Zero, ["msg3", "msg4"]), + ServerAction.Inf() + }; + using var fake = Fake.CreateClientForSingleResponseStream(msgs, flowMaxElements: 2, useMsgAsId: true); + fake.Scheduler.Run(async () => + { + var handled = new List(); + var startTask = fake.Subscriber.StartAsync(async (msg, ct) => + { + var data = msg.Data.ToStringUtf8(); + handled.Locked(() => handled.Add(data)); + if (data != "msg0") + { + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(6), ct)); + } + return SubscriberClient.Reply.Ack; + }); + + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), CancellationToken.None)); + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + + // Request shutdown with a short timeout. + // Timeout=15s: NackDelay=0s (since 15 < GracePeriod=30). NackImmediately triggered at T=5s. + // msg1, msg2 are already being handled and will be Nacked upon completion. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(SubscriberShutdownSetting.WaitForProcessing, TimeSpan.FromSeconds(15))); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(15), CancellationToken.None)); + + // All unhandled messages should be Nacked immediately. + Assert.Equivalent(new[] { "msg0", "msg1", "msg2" }, handled, strict: true); + Assert.Equivalent(new[] { "msg0" }, fake.Subscribers[0].Acks.Select(x => x.Id), strict: true); + Assert.Equivalent(new[] { "msg1", "msg2", "msg3", "msg4" }, fake.Subscribers[0].Nacks.Select(x => x.Id), strict: true); + }); + } + + [Fact] + public void Shutdown_NackImmediately_LeaseExtensionStops() + { + // Ensure lease extensions stop and current leases are Nacked during immediate shutdown. + var msgs = new[] { new[] { + ServerAction.Data(TimeSpan.Zero, new[] { "1" }), ServerAction.Inf() } }; - // Set flowMaxElements to 2 to ensure that "msg0" and "msg2" block flow control preventing - // "msg3","msg4","msg5" from being processed. - using (var fake = Fake.Create(msgs, flowMaxElements: 2, useMsgAsId: true, disposeTimeout: TimeSpan.FromSeconds(10))) + using (var fake = Fake.Create(msgs, ackDeadline: TimeSpan.FromSeconds(30), ackExtendWindow: TimeSpan.FromSeconds(10))) { fake.Scheduler.Run(async () => { - var handledMsgs = new List(); var doneTask = fake.Subscriber.StartAsync(async (msg, ct) => { - var data = msg.Data.ToStringUtf8(); - handledMsgs.Locked(() => handledMsgs.Add(data)); - if (data == "msg0" || data == "msg2") - { - // Delay handling so that StopAsync is called while the rest - // are still pulled but waiting for a flow control slot. - await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(5), ct)); - } + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(120), ct)); return SubscriberClient.Reply.Ack; }); + await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(10), CancellationToken.None)); - // Wait for "msg0" and "msg2" to start being handled. "msg1" will also be handled, but not hold onto - // flow control, releasing it so "msg2" can begin being handled. - await fake.TaskHelper.ConfigureAwait(fake.Scheduler.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); - Assert.Equivalent(new [] {"msg0", "msg1", "msg2"}, handledMsgs); + int numExtensionsBeforeShutdown = fake.Subscribers[0].Extends.Count(); - // Stop the subscriber. This should ensure pulled messages that haven't entered the user handler yet - // will be NAck'ed. Specifically for messages already in flow control, they will have to wait for pending - // messages to be processed before they are NAck'ed. - await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(CancellationToken.None)); + // Request immediate shutdown. + await fake.TaskHelper.ConfigureAwait(fake.Subscriber.StopAsync(SubscriberShutdownSetting.NackImmediately)); + int numExtensionsAfterShutdown = fake.Subscribers[0].Extends.Count(); - // Verify that "msg0", "msg1" and "msg2" completed handling normally, while the rest were - // automatically Nacked during shutdown. - Assert.Equivalent(new [] {"msg0", "msg1", "msg2"}, fake.Subscribers[0].Acks.Select(x => x.Id)); - Assert.Equivalent(new [] {"msg3", "msg4", "msg5"}, fake.Subscribers[0].Nacks.Select(x => x.Id)); + // Verify no more lease extensions occurred after shutdown was initiated. + Assert.Equal(numExtensionsAfterShutdown, numExtensionsBeforeShutdown); }); } } @@ -2073,4 +2535,4 @@ private static IReadOnlyList> CreateBadMoveNextSequenc .Concat(Enumerable.Repeat(ServerAction.Sequence(ServerAction.Inf()), includeTrailing ? 1 : 0)) .ToList(); } -} \ No newline at end of file +} diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs index cf96fa133751..0b185b69cbb0 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs @@ -54,6 +54,24 @@ public enum Reply Ack = 1, } + /// + /// Settings available for subscriber shutdown. + /// + public enum SubscriberShutdownSetting + { + /// + /// Stops streaming new upstream messages and then continues processing all received messages. If there are + /// still messages that need to be processed 30s before the timeout is reached it will switch to . + /// + WaitForProcessing = 0, + + /// + /// Stops streaming new upstream messages and then aggressively releases unhandled messages by sending Nack responses. + /// Already handled messages will still be acknowledged. + /// + NackImmediately = 1, + } + /// /// Default for . /// Allows 1,000 outstanding messages; and 100Mb outstanding bytes. @@ -217,6 +235,7 @@ public virtual Task StartAsync(FuncCancel this to abort handlers and acknowledgement. /// A that completes when all handled messages have been acknowledged; /// faults on unrecoverable service errors; or cancels if is cancelled. + [Obsolete("Use StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken) instead.")] public virtual Task StopAsync(CancellationToken hardStopToken) => throw new NotImplementedException(); /// @@ -229,7 +248,19 @@ public virtual Task StartAsync(FuncAfter this period, abort handling and acknowledging messages. /// A that completes when all handled messages have been acknowledged; /// faults on unrecoverable service errors; or cancels if expires. - public virtual Task StopAsync(TimeSpan timeout) => StopAsync(new CancellationTokenSource(timeout).Token); + [Obsolete("Use StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken) instead.")] + public virtual Task StopAsync(TimeSpan timeout) => StopAsync(SubscriberShutdownSetting.WaitForProcessing, timeout); + + /// + /// Stop this . + /// The returned completes when all handled messages have been acknowledged. + /// The returned faults if there is an unrecoverable error with the underlying service. + /// + /// The to use for shutdown. + /// Optional. The timeout for the shutdown process. If not specified, a default 1-hour timeout is used. + /// Optional. A that can be used to abort the graceful shutdown and trigger an immediate hard stop. + /// A that completes when the subscriber is stopped, or if an unrecoverable error occurs. + public virtual Task StopAsync(SubscriberShutdownSetting shutdownSetting, TimeSpan? timeout = null, CancellationToken cancellationToken = default) => throw new NotImplementedException(); /// /// Disposes this asynchronously. diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs index 280b203de18b..22b71c509f57 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs @@ -149,6 +149,9 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) private readonly CancellationTokenSource _hardStopCts; private readonly CancellationTokenSource _pushStopCts; private readonly CancellationTokenSource _softStopCts; + private readonly CancellationTokenSource _nackImmediatelyCts; + private readonly CancellationTokenSource _waitForProcessingCts; + private readonly CancellationTokenSource _jointNackImmediatelyOrSoftStopCts; private readonly SubscriptionName _subscriptionName; private readonly LeaseTiming _normalLeaseTiming; private readonly LeaseTiming _exactlyOnceDeliveryLeaseTiming; @@ -196,6 +199,9 @@ internal SingleChannel(SubscriberClientImpl subscriber, _hardStopCts = subscriber._globalHardStopCts; _pushStopCts = CancellationTokenSource.CreateLinkedTokenSource(_hardStopCts.Token); _softStopCts = subscriber._globalSoftStopCts; + _nackImmediatelyCts = subscriber._globalNackImmediatelyCts; + _waitForProcessingCts = subscriber._globalWaitForProcessingCts; + _jointNackImmediatelyOrSoftStopCts = CancellationTokenSource.CreateLinkedTokenSource(_nackImmediatelyCts.Token, _softStopCts.Token); _subscriptionName = subscriber.SubscriptionName; _normalLeaseTiming = subscriber._normalLeaseTiming; _exactlyOnceDeliveryLeaseTiming = subscriber._exactlyOnceDeliveryLeaseTiming; @@ -236,7 +242,7 @@ internal async Task StartAsync() } var task = nextContinuation.Task; var next = nextContinuation.NextAction; - if (next.IsPull && (task.IsCanceled || (task.IsFaulted && (task.Exception.IsCancellation() || task.Exception.IsRpcCancellation())))) + if (next.IsPull && (task.IsCanceled || (task.IsFaulted && (task.Exception.IsCancellation() || task.Exception.IsRpcCancellation())) || _waitForProcessingCts.IsCancellationRequested)) { // Pull has been cancelled by user, shutdown pull stream and don't run continuation. // RPC exceptions are dealt with in the relevant handlers. @@ -313,7 +319,7 @@ private void StartStreamingPull() { // Delay, then start the streaming-pull. _logger?.LogDebug("Client {index} delaying for {seconds}s before streaming pull call.", _clientIndex, (int) backoff.TotalSeconds); - Task delayTask = _scheduler.Delay(backoff, _softStopCts.Token); + Task delayTask = _scheduler.Delay(backoff, _waitForProcessingCts.Token); Add(delayTask, Next(true, HandleStartStreamingPullWithoutBackoff)); } else @@ -327,7 +333,7 @@ private void StartStreamingPull() private void HandleStartStreamingPullWithoutBackoff() { _retryState.OnStartAttempt(); - _pull = _client.StreamingPull(CallSettings.FromCancellationToken(_softStopCts.Token)); + _pull = _client.StreamingPull(CallSettings.FromCancellationToken(_waitForProcessingCts.Token)); // Cancellation not needed in this WriteAsync call. The StreamingPull() cancellation // (above) will cause this call to cancel if _softStopCts is cancelled. Task initTask = _pull.WriteAsync(new StreamingPullRequest @@ -391,7 +397,7 @@ private void HandlePullMoveNext(Task initTask) if (throttle) { // Too many queued ack/nack/extend ids. Loop until the queue has drained a bit. - Add(_scheduler.Delay(TimeSpan.FromMilliseconds(100), _softStopCts.Token), Next(true, () => HandlePullMoveNext(null))); + Add(_scheduler.Delay(TimeSpan.FromMilliseconds(100), _waitForProcessingCts.Token), Next(true, () => HandlePullMoveNext(null))); } else { @@ -501,12 +507,15 @@ private async Task ProcessPullMessagesAsync(List msgs, HashSet< // Running async. Common data needs locking for (int msgIndex = 0; msgIndex < msgs.Count; msgIndex++) { + // If NackImmediately has been triggered we stop processing new messages. + _nackImmediatelyCts.Token.ThrowIfCancellationRequested(); + if (_softStopCts.IsCancellationRequested) { // If the subscriber was shutdown we should stop processing and nack remaining messages, releasing // the message for re-delivery. var remainingAckIds = msgs.Skip(msgIndex).Select(x => x.AckId); - Nack(remainingAckIds); + Nack(remainingAckIds, leaseTracking); _softStopCts.Token.ThrowIfCancellationRequested(); } @@ -527,42 +536,57 @@ await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), _messageOrde // Call user message handler var reply = await _taskHelper.ConfigureAwaitHideErrors(() => { - // If the subscriber shut down while waiting for flow control, skip the handler. - // Throwing here triggers a Nack, releasing the message for redelivery. + // If the subscriber shut down while waiting for flow control, throw here to return early. _softStopCts.Token.ThrowIfCancellationRequested(); - return _handler.HandleMessage(msg.Message, _hardStopCts.Token); + _nackImmediatelyCts.Token.ThrowIfCancellationRequested(); + return _handler.HandleMessage(msg.Message, _nackImmediatelyCts.Token); }, Reply.Nack); - // Lock msgsIds, this is accessed concurrently here and in HandleExtendLease(). - lock (leaseTracking) + // For Exactly-Once Delivery, we must always try to process the reply even if it may have already + // been NAck'ed during the shutdown flow. + bool shouldEnqueueReply = _exactlyOnceDeliveryEnabled || !_nackImmediatelyCts.IsCancellationRequested; + + if (shouldEnqueueReply) { - leaseTracking.Remove(msg.AckId); + // Lock msgsIds, this is accessed concurrently here and in HandleExtendLease(). + lock (leaseTracking) + { + leaseTracking.Remove(msg.AckId); + } } // Lock ack/nack-queues, this is accessed concurrently here and in "master" thread. lock (_lock) { _userHandlerInFlight -= 1; - var queue = reply == Reply.Ack ? _ackQueue : _nackQueue; - queue.Enqueue(msg.AckId); + if (shouldEnqueueReply) + { + var queue = reply == Reply.Ack ? _ackQueue : _nackQueue; + queue.Enqueue(msg.AckId); + } } - // Ids have been added to ack/nack-queue, so trigger a push. + // Signal the event loop to push any enqueued replies or to re-evaluate the shutdown state. _eventPush.Set(); })); } + } - void Nack(IEnumerable ackIds) + private void Nack(IEnumerable ackIds, HashSet leaseTracking) + { + var idsToNack = ackIds.ToList(); + if (idsToNack.Count == 0) { - lock (_lock) - { - _nackQueue.Enqueue(ackIds); - } - lock (leaseTracking) - { - leaseTracking.ExceptWith(ackIds); - } - // Ids have been added to nack-queue, so trigger a push. - _eventPush.Set(); + return; + } + + lock (_lock) + { + _nackQueue.Enqueue(idsToNack); + } + lock (leaseTracking) + { + leaseTracking.ExceptWith(idsToNack); } + _eventPush.Set(); } private class LeaseCancellation @@ -581,8 +605,8 @@ public CancellationToken Token } } - public LeaseCancellation(CancellationTokenSource softStopCts) => - _cts = CancellationTokenSource.CreateLinkedTokenSource(softStopCts.Token); + public LeaseCancellation(CancellationTokenSource leaseStopCts) => + _cts = CancellationTokenSource.CreateLinkedTokenSource(leaseStopCts.Token); public void Dispose() { @@ -619,6 +643,12 @@ public void Cancel() private void HandleExtendLease(HashSet msgIds, LeaseCancellation cancellation) { + if (_nackImmediatelyCts.IsCancellationRequested) + { + // No further lease extensions once stop is requested. + Nack(ackIds: msgIds, leaseTracking: msgIds); + return; + } if (_softStopCts.IsCancellationRequested) { // No further lease extensions once stop is requested. @@ -630,15 +660,22 @@ private void HandleExtendLease(HashSet msgIds, LeaseCancellation cancell { // Create a task to cancel lease-extension once `_maxExtensionDuration` has been reached. // This set up once for each chunk of received messages, and passed through to each future call to this method. - cancellation = new LeaseCancellation(_softStopCts); + cancellation = new LeaseCancellation(_jointNackImmediatelyOrSoftStopCts); Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () => { // This is executed when `_maxExtensionDuration` has expired, or when `cancellation` is cancelled, // Which ensures `cancellation` is aways disposed of. cancellation.Dispose(); - lock (msgIds) + if (_nackImmediatelyCts.IsCancellationRequested) { - msgIds.Clear(); + Nack(ackIds: msgIds, leaseTracking: msgIds); + } + else + { + lock (msgIds) + { + msgIds.Clear(); + } } })); } @@ -663,10 +700,10 @@ private void HandleExtendLease(HashSet msgIds, LeaseCancellation cancell _eventPush.Set(); // Some ids still exist, schedule another extension. // The overall `_maxExtensionDuration` is maintained by passing through the existing `cancellation`. - Add(_scheduler.Delay(EffectiveLeaseTiming.AutoExtendDelay, _softStopCts.Token), Next(false, () => HandleExtendLease(msgIds, cancellation))); + Add(_scheduler.Delay(EffectiveLeaseTiming.AutoExtendDelay, _jointNackImmediatelyOrSoftStopCts.Token), Next(false, () => HandleExtendLease(msgIds, cancellation))); // Increment _extendThrottles. _extendThrottleHigh += 1; - Add(_scheduler.Delay(EffectiveLeaseTiming.ExtendQueueThrottleInterval, _softStopCts.Token), Next(false, () => _extendThrottleLow += 1)); + Add(_scheduler.Delay(EffectiveLeaseTiming.ExtendQueueThrottleInterval, _jointNackImmediatelyOrSoftStopCts.Token), Next(false, () => _extendThrottleLow += 1)); } else { @@ -1092,13 +1129,13 @@ private void HandleStreamPing() { // Need to explicitly check this, as the continuation passed to Add() may be executed // regardless of the fault/cancellation state of the Task. - if (_softStopCts.IsCancellationRequested) + if (_waitForProcessingCts.IsCancellationRequested) { // No more pings when subscriber stopping. return; } // Schedule next ping, this never stops whilst this subscriber as active - Add(_scheduler.Delay(s_streamPingPeriod, _softStopCts.Token), Next(false, HandleStreamPing)); + Add(_scheduler.Delay(s_streamPingPeriod, _waitForProcessingCts.Token), Next(false, HandleStreamPing)); // If messages are currently being processed, then ping the stream periodically; // this ensures the stream isn't closed. // If the stream is closed, then all gRPC-buffered messages have their server-side diff --git a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs index 9ff2e30d9c8f..f932e97b4ee4 100644 --- a/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs +++ b/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs @@ -87,10 +87,14 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable _globalWaitForProcessingCts.IsCancellationRequested; private TaskCompletionSource _mainTcs; private CancellationTokenSource _globalSoftStopCts; // soft-stop is guarenteed to occur before hard-stop. private CancellationTokenSource _globalHardStopCts; + private CancellationTokenSource _globalWaitForProcessingCts; + private CancellationTokenSource _globalNackImmediatelyCts; // This property only exists for testing. // This is the delay between obtaining a lease on a message and then further extending the lease on that message @@ -120,8 +124,10 @@ public override Task StartAsync(SubscriptionHandler handler) { GaxPreconditions.CheckState(_mainTcs == null, "Can only start an instance once."); _mainTcs = new TaskCompletionSource(); - _globalSoftStopCts = new CancellationTokenSource(); _globalHardStopCts = new CancellationTokenSource(); + _globalSoftStopCts = CancellationTokenSource.CreateLinkedTokenSource(_globalHardStopCts.Token); + _globalNackImmediatelyCts = CancellationTokenSource.CreateLinkedTokenSource(_globalHardStopCts.Token); + _globalWaitForProcessingCts = CancellationTokenSource.CreateLinkedTokenSource(_globalNackImmediatelyCts.Token, _globalSoftStopCts.Token); } var registeredTasks = new HashSet(); Action registerTask = task => @@ -194,16 +200,19 @@ public override ValueTask DisposeAsync() return new ValueTask(Task.CompletedTask); } } - return new ValueTask(StopAsync(_disposeTimeout)); +#pragma warning disable CS0618 // allow use of obsolete method + return new ValueTask(StopAsync( _disposeTimeout)); +#pragma warning restore CS0618 } /// + [Obsolete("Use StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken) instead.")] public override Task StopAsync(CancellationToken hardStopToken) { lock (_lock) { // Note: If multiple stop requests are made, only the first cancellation token is observed. - if (_mainTcs is not null && _globalSoftStopCts.IsCancellationRequested) + if (_mainTcs is not null && _isStopInitiated) { // No-op. We don't want to throw exceptions if DisposeAsync or StopAsync is called a second time. return _mainTcs.Task; @@ -212,11 +221,92 @@ public override Task StopAsync(CancellationToken hardStopToken) _globalSoftStopCts.Cancel(); } - var registration = hardStopToken.Register(() => _globalHardStopCts.Cancel()); + CancelTargetOnTrigger(targetSourceToCancel: _globalHardStopCts, triggerToken: hardStopToken); + return _mainTcs.Task; + } + + /// + public override Task StopAsync(SubscriberShutdownSetting shutdownSetting, TimeSpan? timeout = null, CancellationToken cancellationToken = default) + { + lock (_lock) + { + GaxPreconditions.CheckState(_mainTcs != null, "Can only stop a started instance."); + // Only the first call to StopAsync is observed + if (_isStopInitiated) + { + // No-op. We don't want to throw exceptions if DisposeAsync or StopAsync is called a second time. + return _mainTcs.Task; + } + // Signal to stop retrieving new messages immediately. + _globalWaitForProcessingCts.Cancel(); + } + + TimeSpan shutdownTimeout = timeout ?? DefaultMaxTotalAckExtension; + if (shutdownTimeout <= TimeSpan.Zero) + { + Logger?.LogWarning("Shutdown timeout is 0! Stopping {Client} immediately; work may be dropped.", nameof(SubscriberClient)); + } + + CancelAfterDelay(_globalNackImmediatelyCts, CalculateNackDelay()); + CancelAfterDelay(_globalHardStopCts, shutdownTimeout); + CancelTargetOnTrigger(targetSourceToCancel: _globalHardStopCts, triggerToken: cancellationToken); + + return _mainTcs.Task; + + // Triggers cancellation after provided delay. This is used instead of .CancelAfterDelay to integrate with + // IScheduler, allowing this shutdown method to be tested. + void CancelAfterDelay(CancellationTokenSource cts, TimeSpan delay) + { + if (delay == TimeSpan.Zero) + { + cts.Cancel(); + return; + } + _taskHelper.Run(async () => + { + await _taskHelper.ConfigureAwait(_scheduler.Delay(delay, CancellationToken.None)); + cts.Cancel(); + }); + } + + // Calculates the time until we switch to NackImmediately mode. + TimeSpan CalculateNackDelay() + { + switch (shutdownSetting) + { + case SubscriberShutdownSetting.NackImmediately: + return TimeSpan.Zero; + case SubscriberShutdownSetting.WaitForProcessing: + // WaitForProcessing enters a Nack grace period before final shutdown if it was unable to finish + // handling all received messages. + TimeSpan delay = shutdownTimeout > s_nackGracePeriod ? shutdownTimeout - s_nackGracePeriod : TimeSpan.Zero; + if (delay == TimeSpan.Zero && shutdownTimeout > TimeSpan.Zero) + { + Logger?.LogWarning("Shutdown timeout ({Timeout}) <= GracePeriod ({GracePeriod}). Nacking immediately.", + shutdownTimeout, s_nackGracePeriod); + } + return delay; + default: + throw new ArgumentOutOfRangeException(nameof(shutdownSetting), shutdownSetting, null); + } + } + } + + /// + /// Configures a to be cancelled automatically + /// whenever a specific is triggered. + /// The registration is automatically disposed when _mainTcs completes. + /// + /// The destination that should be cancelled. + /// The source that provides the cancellation signal. + private void CancelTargetOnTrigger( + CancellationTokenSource targetSourceToCancel, + CancellationToken triggerToken) + { + var registration = triggerToken.Register(() => targetSourceToCancel.Cancel()); // Do not register this Task to be awaited on at shutdown. // It completes *after* _mainTcs, and all registered tasks must complete before _mainTcs _taskHelper.Run(async () => await _taskHelper.ConfigureAwaitWithFinally(() => _mainTcs.Task, () => registration.Dispose())); - return _mainTcs.Task; } } diff --git a/apis/Google.Cloud.PubSub.V1/docs/index.md b/apis/Google.Cloud.PubSub.V1/docs/index.md index b50e41fb7f69..aab7bded1058 100644 --- a/apis/Google.Cloud.PubSub.V1/docs/index.md +++ b/apis/Google.Cloud.PubSub.V1/docs/index.md @@ -172,6 +172,15 @@ Below is an example implementation of a console application that utilizes the de {{sample:SubscriberClient.UseSubscriberServiceInConsoleApp}} +## Subscriber shutdown + +When shutting down a `SubscriberClient`, two different shutdown flows are available via the `StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken)` method: + +- **NackImmediately**: This immediately stops streaming new messages and then actively sends "Nack" (Negative Acknowledgement) responses for any messages that have been received but have not yet finished being handled. This allows those messages to be quickly redelivered to other active subscribers. +- **WaitForProcessing**: This immediately stops streaming new messages from the server but continues to process all messages that have already been received. If processing does not complete 30s before the specified timeout, the client will switch to NackImmediately to release any remaining messages. + +By default, a 1-hour timeout is used for the shutdown process, which can be customized as needed. When the timeout is reached or if the `CancellationToken` is invoked this will trigger an immediate hard stop, aborting all outstanding tasks. + ## Disposing of the publisher and subscriber clients Both `PublisherClient` and `SubscriberClient` implement the `IAsyncDisposable` interface,