Skip to content

feat(PubSub): Add NackImmediately and WaitForProcessing shutdown mode#15482

Open
robertvoinescu-work wants to merge 2 commits intogoogleapis:mainfrom
robertvoinescu-work:pubsub/shutdownOptions
Open

feat(PubSub): Add NackImmediately and WaitForProcessing shutdown mode#15482
robertvoinescu-work wants to merge 2 commits intogoogleapis:mainfrom
robertvoinescu-work:pubsub/shutdownOptions

Conversation

@robertvoinescu-work
Copy link
Contributor

b/427314526

@robertvoinescu-work robertvoinescu-work requested a review from a team as a code owner February 24, 2026 16:53
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Feb 24, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @robertvoinescu-work, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly improves the SubscriberClient shutdown mechanism by introducing explicit shutdown settings: NackImmediately and WaitForProcessing. These settings provide developers with greater control over how messages are handled when a subscriber is stopped, allowing for either immediate negative acknowledgment of unhandled messages or a graceful period to complete processing before Nacking. This change enhances the reliability and predictability of subscriber termination, ensuring messages are managed appropriately based on application requirements.

Highlights

  • New Subscriber Shutdown Modes: Introduced 'NackImmediately' and 'WaitForProcessing' settings for SubscriberClient shutdown, offering fine-grained control over message handling during termination.
  • Enhanced StopAsync Method: Added a new StopAsync overload that accepts SubscriberShutdownSetting, an optional timeout, and a cancellation token, replacing previous StopAsync methods which are now marked obsolete.
  • Graceful Shutdown Logic: Implemented logic for 'WaitForProcessing' mode, which attempts to process all in-flight messages within a timeout, with a fallback to 'NackImmediately' if the timeout is approached.
  • Updated Test Coverage: Extensive unit and integration tests were added or modified to validate the behavior of the new shutdown settings, including scenarios for immediate Nack, waiting for processing, timeout handling, and cancellation.
  • Documentation Update: The docs/index.md file was updated to explain the new subscriber shutdown options and their usage.
Changelog
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs
    • Updated StopAsync calls in integration tests to use the new SubscriberClient.SubscriberShutdownSetting.NackImmediately for consistent shutdown behavior.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs
    • Modified the StopAsync call to explicitly use SubscriberClient.SubscriberShutdownSetting.NackImmediately for shutdown.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs
    • Updated StopAsync in snippets to demonstrate the use of SubscriberClient.SubscriberShutdownSetting.NackImmediately.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs
    • Changed StopAsync call to use SubscriberClient.SubscriberShutdownSetting.NackImmediately in the sample code.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs
    • Removed System.Text.RegularExpressions import.
    • Added a using alias for SubscriberShutdownSetting.
    • Renamed several existing test methods to include _Obsolete suffix and introduced new test methods to cover the new SubscriberShutdownSetting enum.
    • Updated StopAsync calls across numerous tests to incorporate the new SubscriberShutdownSetting parameter.
    • Added new dedicated tests for various shutdown scenarios, including Shutdown_SoftStop_NacksMessages, Shutdown_NackImmediately_Success, Shutdown_WaitForProcessing_CompletesBeforeNack, Shutdown_WaitForProcessing_NacksOnTimeout, Shutdown_CancellationToken_AbortsWaitForProcessing, Shutdown_WaitForProcessing_NacksWhenTimeoutLessThanMinimum, and Shutdown_NackImmediately_LeaseExtensionStops.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs
    • Defined a new SubscriberShutdownSetting enum with WaitForProcessing and NackImmediately options.
    • Marked the existing StopAsync(CancellationToken) and StopAsync(TimeSpan) methods as obsolete.
    • Introduced a new public virtual Task StopAsync(SubscriberShutdownSetting shutdownSetting, TimeSpan? timeout = null, CancellationToken cancellationToken = default) method.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs
    • Added new CancellationTokenSource fields (_nackImmediatelyCts, _waitForProcessingCts, _jointNackImmediatelyOrSoftStopCts) to manage different shutdown phases.
    • Modified internal logic in StartAsync, StartStreamingPull, HandlePullMoveNext, ProcessPullMessagesAsync, HandleExtendLease, and HandleStreamPing to respect the new shutdown cancellation tokens.
    • Updated the Nack helper method to accept leaseTracking for more precise message management during shutdown.
    • Changed LeaseCancellation constructor to accept a leaseStopCts.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs
    • Added s_nackGracePeriod constant and _isStopInitiated property.
    • Introduced _globalWaitForProcessingCts and _globalNackImmediatelyCts as new global cancellation token sources.
    • Modified the StartAsync method to initialize these new cancellation token sources.
    • Updated DisposeAsync to call the new StopAsync overload.
    • Refactored the StopAsync implementation to handle the new SubscriberShutdownSetting, including logic for CancelAfterDelay and CalculateNackDelay to manage the shutdown timeline.
    • Added a CancelTargetOnTrigger helper method for managing cancellation token registrations.
  • apis/Google.Cloud.PubSub.V1/docs/index.md
    • Added a new section detailing the Subscriber shutdown process, explaining the NackImmediately and WaitForProcessing settings, their behavior, and configurable timeouts.
Activity
  • The pull request was created by robertvoinescu-work.
  • It introduces a significant feature to the Pub/Sub client library, enhancing control over subscriber shutdown.
  • The changes span across core logic, testing, and documentation, indicating a comprehensive implementation.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces two new shutdown modes for SubscriberClient: NackImmediately and WaitForProcessing, providing users more control over handling outstanding messages during shutdown. A security audit found no security vulnerabilities in the changes related to the graceful shutdown process. However, there is a potential memory leak issue related to orphaned delay tasks during shutdown that should be addressed.

}
_taskHelper.Run(async () =>
{
await _taskHelper.ConfigureAwait(_scheduler.Delay(delay, CancellationToken.None));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _scheduler.Delay call uses CancellationToken.None. If the SubscriberClient is stopped via the cancellationToken parameter (triggering an immediate hard stop), these background delay tasks will continue to run for their full duration (which can be up to 1 hour by default). These tasks hold references to the CancellationTokenSource objects, which can prevent the SubscriberClient and its associated resources from being garbage collected until the delays expire. It is recommended to pass cts.Token or a combined token to the Delay call to ensure these tasks are cancelled as soon as the target CTS is cancelled or a hard stop occurs.

                await _taskHelper.ConfigureAwait(_scheduler.Delay(delay, cts.Token));

Copy link
Contributor

@amanda-tarafa amanda-tarafa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed some of this, but I have some questions. And also a lot of my comments are about rewording, etc. Let's chat tomorrow.

/// <summary>
/// Settings available for subscriber shutdown.
/// </summary>
public enum SubscriberShutdownSetting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider ShutdownOption, mostly dropping subscription given that this is a type defined on the SubscriptionClient.

Comment on lines +63 to +65
/// Stops streaming new upstream messages and then continues processing all received messages. If there are
/// still messages that need to be processed 30s before the timeout is reached it will switch to <see cref="NackImmediately"/>.
/// </summary>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upstream is not a familiar term for Pub/Sub users as far as I'm aware. I think it's better to say, something like "stops the subscriber stream so no new messages are received".

Also, there's no timeout here, so referencing the timeout is unclear. The explanation re switching to nack inmediately is likely better suited for the method itself.

WaitForProcessing = 0,

/// <summary>
/// Stops streaming new upstream messages and then aggressively releases unhandled messages by sending Nack responses.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment re: upstream. And avoid using aggressively. Just say that it nacks all unhandled messages.


/// <summary>
/// Stops streaming new upstream messages and then aggressively releases unhandled messages by sending Nack responses.
/// Already handled messages will still be acknowledged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client app response to a handle message may have been nack, so here something like "the ack/nack will be sent".

And are we certain about this?


/// <summary>
/// Stop this <see cref="SubscriberClient"/>.
/// The returned <see cref="Task"/> completes when all handled messages have been acknowledged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, or nacked.

But also, this is not true, it depends on what the settings are.

Comment on lines +129 to +130
_globalNackImmediatelyCts = CancellationTokenSource.CreateLinkedTokenSource(_globalHardStopCts.Token);
_globalWaitForProcessingCts = CancellationTokenSource.CreateLinkedTokenSource(_globalNackImmediatelyCts.Token, _globalSoftStopCts.Token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might indeed need a paragraph or two about the token sources, and specifically why we can't reuse the existing ones and what is the relationship between the existing ones and the new ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, can we start using the new ones which are the ones that will remain after the deprecation and eventual deletion of the current method, and make the old ones depend on the new ones?

{
// Note: If multiple stop requests are made, only the first cancellation token is observed.
if (_mainTcs is not null && _globalSoftStopCts.IsCancellationRequested)
if (_mainTcs is not null && _isStopInitiated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a change here, IsStopInitiated is looking at the new globalWaitForProcessingCts

_globalWaitForProcessingCts.Cancel();
}

TimeSpan shutdownTimeout = timeout ?? DefaultMaxTotalAckExtension;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be _maxExtensionDuration

// Delay, then start the streaming-pull.
_logger?.LogDebug("Client {index} delaying for {seconds}s before streaming pull call.", _clientIndex, (int) backoff.TotalSeconds);
Task delayTask = _scheduler.Delay(backoff, _softStopCts.Token);
Task delayTask = _scheduler.Delay(backoff, _waitForProcessingCts.Token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all of these I still don't fully understand the relationship between the new and old tokens. A paragraph on the design should be enough.

@@ -501,12 +507,15 @@ private async Task ProcessPullMessagesAsync(List<ReceivedMessage> msgs, HashSet<
// Running async. Common data needs locking
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you have changed the token in line 455?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants