Skip to content

Commit dcdec22

Browse files
ReubenBondCopilot
andauthored
Fix persistent stream subscriber registration race (#9927)
* Fix persistent stream registration race Track the earliest stream token observed while subscriber registration is still in progress and use it to initialize the cursor when handshake has no explicit token. This removes timing dependence from dropped-client producer scenarios without adding sleeps. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Keep earliest pending stream start token Address PR feedback by updating pending token selection to retain the oldest token observed while registration is incomplete. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 63c0c38 commit dcdec22

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ private async Task AddSubscriber_Impl(
243243

244244
if (await DoHandshakeWithConsumer(data, cacheToken))
245245
{
246+
data.PendingStartToken = null;
246247
data.IsRegistered = true;
247248
if (data.State == StreamConsumerDataState.Inactive)
248249
RunConsumerCursor(data).Ignore(); // Start delivering events if not actively doing so
@@ -268,15 +269,17 @@ private async Task<bool> DoHandshakeWithConsumer(
268269
this.options.MaxEventDeliveryTime,
269270
deliveryBackoffProvider);
270271

271-
if (requestedHandshakeToken != null)
272+
var requestedToken = requestedHandshakeToken?.Token;
273+
if (requestedToken != null)
272274
{
273275
consumerData.SafeDisposeCursor(logger);
274-
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, requestedHandshakeToken.Token);
276+
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, requestedToken);
275277
}
276278
else
277279
{
280+
var registrationToken = cacheToken ?? consumerData.PendingStartToken;
278281
if (consumerData.Cursor == null) // if the consumer did not ask for a specific token and we already have a cursor, just keep using it.
279-
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, cacheToken);
282+
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, registrationToken);
280283
}
281284
}
282285
catch (Exception exception)
@@ -298,7 +301,8 @@ private async Task<bool> DoHandshakeWithConsumer(
298301
{
299302
try
300303
{
301-
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, cacheToken);
304+
var registrationToken = cacheToken ?? consumerData.PendingStartToken;
305+
consumerData.Cursor = queueCache.GetCacheCursor(consumerData.StreamId, registrationToken);
302306
}
303307
catch (Exception)
304308
{
@@ -513,6 +517,10 @@ private void StartInactiveCursors(StreamConsumerCollection streamData, StreamSeq
513517
}
514518
else
515519
{
520+
if (consumerData.PendingStartToken is null || startToken.Older(consumerData.PendingStartToken))
521+
{
522+
consumerData.PendingStartToken = startToken;
523+
}
516524
LogDebugPulledNewMessages(consumerData.StreamId);
517525
}
518526
}

src/Orleans.Streaming/PersistentStreams/QueueStreamDataStructures.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ internal sealed class StreamConsumerData
3333

3434
[NonSerialized]
3535
public bool IsRegistered = false;
36+
[NonSerialized]
37+
public StreamSequenceToken? PendingStartToken;
3638

3739
public StreamConsumerData(GuidId subscriptionId, QualifiedStreamId streamId, IStreamConsumerExtension streamConsumer, string filterData)
3840
{

0 commit comments

Comments
 (0)