Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private async Task RunBulkMessagingImpl(
{
// Test finished, so stop subscriber
Console.WriteLine("All msgs received, stopping subscriber.");
Task unused = subscriber.StopAsync(TimeSpan.FromSeconds(15));
Task unused = subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15));
}
}
else
Expand Down Expand Up @@ -194,11 +194,11 @@ private async Task RunBulkMessagingImpl(
{
if (noProgressCount > 60)
{
// Deadlock, shutdown subscriber, and cancel
Console.WriteLine("Deadlock detected. Cancelling test");
subscriber.StopAsync(new CancellationToken(true));
watchdogCts.Cancel();
break;
// Deadlock, shutdown subscriber, and cancel
Console.WriteLine("Deadlock detected. Cancelling test");
subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, cancellationToken: new CancellationToken(true));
watchdogCts.Cancel();
break;
}
noProgressCount += 1;
}
Expand Down Expand Up @@ -431,7 +431,7 @@ public async Task StopStartSubscriber(int totalMessageCount, double publisherFre
});
await Task.Delay(subscriberLifetime);
Console.WriteLine("Stopping subscriber");
Task stopTask = subscriber.StopAsync(TimeSpan.FromSeconds(15));
Task stopTask = subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15));
// If shutdown times-out then stopTask, and also Task.WhenAll will cancel, causing the test to fail.
await Task.WhenAll(subscribeTask, stopTask);
int recvCount = recvedMsgs.Locked(() => recvedMsgs.Count);
Expand Down Expand Up @@ -538,8 +538,8 @@ await subscriberApi.IAMPolicyClient.SetIamPolicyAsync(new SetIamPolicyRequest
{
result.Add((msg.GetDeliveryAttempt(), true));
// Received DLQ message, so stop test.
sub.StopAsync(TimeSpan.FromSeconds(10));
dlqSub.StopAsync(TimeSpan.FromSeconds(10));
sub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(10));
dlqSub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(10));
return Task.FromResult(SubscriberClient.Reply.Ack);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Task Subscribe()
if (recvCount == inputLines.Count)
{
Console.WriteLine("Received all messages, shutting down");
var dummyTask = sub.StopAsync(CancellationToken.None);
var dummyTask = sub.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately);
}
}
if (rnd.Next(3) == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ await _subscriberClient.StartAsync((msg, token) =>
});

public override async Task StopAsync(CancellationToken stoppingToken) =>
await _subscriberClient.StopAsync(stoppingToken);
await _subscriberClient.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, cancellationToken: stoppingToken);
}
// End sample
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ await subscriber.StartAsync((msg, cancellationToken) =>
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
// Stop this subscriber after one message is received.
// This is non-blocking, and the returned Task may be awaited.
subscriber.StopAsync(TimeSpan.FromSeconds(15));
subscriber.StopAsync(SubscriberClient.SubscriberShutdownSetting.NackImmediately, TimeSpan.FromSeconds(15));
// Return Reply.Ack to indicate this message has been handled.
return Task.FromResult(SubscriberClient.Reply.Ack);
});
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ public enum Reply
Ack = 1,
}

/// <summary>
/// Settings available for subscriber shutdown.
/// </summary>
public enum SubscriberShutdownSetting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider ShutdownOption, mostly dropping subscription given that this is a type defined on the SubscriptionClient.

{
/// <summary>
/// Stops streaming new upstream messages and then continues processing all received messages. If there are
/// still messages that need to be processed 30s before the timeout is reached it will switch to <see cref="NackImmediately"/>.
/// </summary>
Comment on lines +63 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upstream is not a familiar term for Pub/Sub users as far as I'm aware. I think it's better to say, something like "stops the subscriber stream so no new messages are received".

Also, there's no timeout here, so referencing the timeout is unclear. The explanation re switching to nack inmediately is likely better suited for the method itself.

WaitForProcessing = 0,

/// <summary>
/// Stops streaming new upstream messages and then aggressively releases unhandled messages by sending Nack responses.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment re: upstream. And avoid using aggressively. Just say that it nacks all unhandled messages.

/// Already handled messages will still be acknowledged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client app response to a handle message may have been nack, so here something like "the ack/nack will be sent".

And are we certain about this?

/// </summary>
NackImmediately = 1,
}

/// <summary>
/// Default <see cref="FlowControlSettings"/> for <see cref="SubscriberClient"/>.
/// Allows 1,000 outstanding messages; and 100Mb outstanding bytes.
Expand Down Expand Up @@ -217,6 +235,7 @@ public virtual Task StartAsync(Func<PubsubMessage, CancellationToken, Task<Reply
/// <param name="hardStopToken">Cancel this <see cref="CancellationToken"/> to abort handlers and acknowledgement.</param>
/// <returns>A <see cref="Task"/> that completes when all handled messages have been acknowledged;
/// faults on unrecoverable service errors; or cancels if <paramref name="hardStopToken"/> is cancelled.</returns>
[Obsolete("Use StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken) instead.")]
public virtual Task StopAsync(CancellationToken hardStopToken) => throw new NotImplementedException();

/// <summary>
Expand All @@ -229,7 +248,19 @@ public virtual Task StartAsync(Func<PubsubMessage, CancellationToken, Task<Reply
/// <param name="timeout">After this period, abort handling and acknowledging messages.</param>
/// <returns>A <see cref="Task"/> that completes when all handled messages have been acknowledged;
/// faults on unrecoverable service errors; or cancels if <paramref name="timeout"/> expires.</returns>
public virtual Task StopAsync(TimeSpan timeout) => StopAsync(new CancellationTokenSource(timeout).Token);
[Obsolete("Use StopAsync(SubscriberShutdownSetting, TimeSpan?, CancellationToken) instead.")]
public virtual Task StopAsync(TimeSpan timeout) => StopAsync(SubscriberShutdownSetting.WaitForProcessing, timeout);

/// <summary>
/// Stop this <see cref="SubscriberClient"/>.
/// The returned <see cref="Task"/> completes when all handled messages have been acknowledged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, or nacked.

But also, this is not true, it depends on what the settings are.

/// The returned <see cref="Task"/> faults if there is an unrecoverable error with the underlying service.
/// </summary>
/// <param name="shutdownSetting">The <see cref="SubscriberShutdownSetting"/> to use for shutdown.</param>
/// <param name="timeout">Optional. The timeout for the shutdown process. If not specified, a default 1-hour timeout is used.</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this 1 hour coming from?

Also, more than an optional parameter, I think I'd prefer that it accepted null.

/// <param name="cancellationToken">Optional. A <see cref="CancellationToken"/> that can be used to abort the graceful shutdown and trigger an immediate hard stop.</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so sure about this but let's chat.

/// <returns>A <see cref="Task"/> that completes when the subscriber is stopped, or if an unrecoverable error occurs.</returns>
public virtual Task StopAsync(SubscriberShutdownSetting shutdownSetting, TimeSpan? timeout = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();

/// <summary>
/// Disposes this <see cref="SubscriberClient"/> asynchronously.
Expand Down
Loading
Loading