Conversation
- Added `_completed` flag to track completion state. - Used `Interlocked.CompareExchange` to prevent duplicate result setting. - Reset `_completed` during `PingCommand.Reset()`. - Safeguarded cancellation token registration to avoid premature task cancellations.
- Implemented concurrency and cancellation scenarios for `PingCommand`. - Added mock and real server tests to validate `PingAsync` workflows. - Improved `MockServer` with optional `autoPong` behavior.
|
There was a problem hiding this comment.
Pull request overview
This PR aims to make NatsConnection.PingAsync properly cancellable and to harden PingCommand against result/cancel completion races, with new unit/integration tests exercising the behavior.
Changes:
- Register the caller
CancellationTokento cancel an in-flightPingCommandwhile awaiting a PONG. - Add a completion gate to
PingCommandto prevent double-completion races (SetResultvsSetCanceled). - Extend the test
MockServerto optionally suppress auto-PONGs and add new ping cancellation/unit tests.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/NATS.Client.TestUtilities/MockServer.cs | Adds autoPong option to simulate missing PONG responses in tests. |
| tests/NATS.Client.CoreUnit.Tests/PingCommandTest.cs | Adds unit tests validating PingCommand completion/cancellation and race behavior. |
| tests/NATS.Client.Core2.Tests/PingCancellationTest.cs | Adds integration-style tests for PingAsync success, concurrency, and cancellation scenarios. |
| src/NATS.Client.Core/NatsConnection.Ping.cs | Adds CancellationToken registration intended to cancel an outstanding ping wait. |
| src/NATS.Client.Core/Commands/PingCommand.cs | Adds an interlocked _completed gate to avoid result/cancel completion races. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| #if NETSTANDARD | ||
| using var registration = cancellationToken.Register(static state => ((PingCommand)state!).SetCanceled(), pingCommand); | ||
| #else | ||
| await using var registration = cancellationToken.UnsafeRegister(static state => ((PingCommand)state!).SetCanceled(), pingCommand); | ||
| #endif | ||
| return await pingCommand.RunAsync().ConfigureAwait(false); |
There was a problem hiding this comment.
The cancellation callback completes the PingCommand while it may still be enqueued in the reader’s _pingCommands queue. Since PingCommand.GetResult() returns the instance to the object pool, a cancelled ping can be reset/reused while still sitting in the queue; the next PONG will dequeue that same instance and call SetResult(), potentially completing a different (reused) ping. To fix this, ensure a PingCommand is not reset/returned to the pool until it has been removed from the ping queue (e.g., move pooling/Reset out of PingCommand.GetResult and return it from the dequeue site, or introduce a dequeue/removal mechanism on cancellation).
There was a problem hiding this comment.
False positive. When SetCanceled() is called, GetResult() throws OperationCanceledException, so the pool return code is never reached. The cancelled command stays in _pingCommands but the _completed flag makes the eventual SetResult() from PONG a no-op. No fix needed.
| await CommandWriter.PingAsync(pingCommand, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| #if NETSTANDARD | ||
| using var registration = cancellationToken.Register(static state => ((PingCommand)state!).SetCanceled(), pingCommand); | ||
| #else | ||
| await using var registration = cancellationToken.UnsafeRegister(static state => ((PingCommand)state!).SetCanceled(), pingCommand); | ||
| #endif | ||
| return await pingCommand.RunAsync().ConfigureAwait(false); |
There was a problem hiding this comment.
The CancellationToken registration is created after awaiting CommandWriter.PingAsync(). There’s a race where the token can be cancelled after PingAsync returns (ping already enqueued) but before the registration is established, leaving pingCommand.RunAsync() without any cancellation signal. Consider registering before sending/enqueuing the ping (or at least checking cancellationToken.IsCancellationRequested immediately after the await and calling pingCommand.SetCanceled()).
…so the token is monitored from the moment the command exists, with no gap
* Add optional window_size parameter to StreamSnapshotRequest (#1088) * Bump Microsoft.Bcl.Memory from 9.0.0 to 9.0.14 (#1089) * Fix `PingCommand` cancellation (#1086) * rework terminate reason (#1081) * Fix OTel network telemetry tags (#1078) * Add consumer info usage warnings (#1079) * Add TermWithReason support to AckTerminateAsync (#1048) * Fix code analyser warning (#1076) * Add cancelled token handling for consumers (#1068) * Add validation for unsupported PinnedClient calls (#1063) * Add see-also references to Orbit packages (#1077) * Add Synadia.Orbit.Testing.NatsServerProcessManager (#1065) * Add slow consumer docs (#1073) * Fix error logs URI rewritten by OnConnectingAsync (#1067) * Add more tests for JetStream consumer behavior (#1055)
* Add optional window_size parameter to StreamSnapshotRequest (#1088) * Bump Microsoft.Bcl.Memory from 9.0.0 to 9.0.14 (#1089) * Fix `PingCommand` cancellation (#1086) * rework terminate reason (#1081) * Fix OTel network telemetry tags (#1078) * Add consumer info usage warnings (#1079) * Add TermWithReason support to AckTerminateAsync (#1048) * Fix code analyser warning (#1076) * Add cancelled token handling for consumers (#1068) * Add validation for unsupported PinnedClient calls (#1063) * Add see-also references to Orbit packages (#1077) * Add Synadia.Orbit.Testing.NatsServerProcessManager (#1065) * Add slow consumer docs (#1073) * Fix error logs URI rewritten by OnConnectingAsync (#1067) * Add more tests for JetStream consumer behavior (#1055)
Uh oh!
There was an error while loading. Please reload this page.