Skip to content

Commit f94503e

Browse files
sdesaiLULAjeremydmiller
authored andcommitted
Avoid race conditions and improve performance when polling scheduled messages.
1 parent 3441115 commit f94503e

File tree

2 files changed

+78
-37
lines changed

2 files changed

+78
-37
lines changed

src/Transports/Redis/Wolverine.Redis.Tests/ScheduledMessageTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public async Task should_delay_execution_of_scheduled_message()
7070
tracker.ReceivedMessages.ShouldNotContain(command.Id);
7171

7272
// Wait for the message to be processed after the scheduled time
73-
await Task.Delay(3000);
73+
await Task.Delay(7000);
7474

7575
tracker.ReceivedMessages.ShouldContain(command.Id);
7676
var executionTime = tracker.GetExecutionTime(command.Id);

src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs

Lines changed: 77 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -490,54 +490,95 @@ public async Task<long> MoveScheduledToReadyStreamAsync(CancellationToken cancel
490490
{
491491
var database = _transport.GetDatabase(database: _endpoint.DatabaseId);
492492
var scheduledKey = _endpoint.ScheduledMessagesKey;
493-
493+
494494
try
495495
{
496496
// Get current time as Unix timestamp in milliseconds
497497
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
498498

499499
// Get all messages that are ready to be executed (score <= now)
500-
var readyMessages = await database.SortedSetRangeByScoreAsync(
501-
scheduledKey,
502-
start: double.NegativeInfinity,
503-
stop: now);
504-
505-
if (readyMessages == null || readyMessages.Length == 0)
506-
{
507-
return 0;
508-
}
509-
510-
_logger.LogDebug("Found {Count} scheduled messages ready for execution in {ScheduledKey}",
511-
readyMessages.Length, scheduledKey);
512500

513501
long count = 0;
514-
foreach (var serializedEnvelope in readyMessages)
502+
var limit = 100; // Limit number of messages to move in one call
503+
var hasMore = true;
504+
while (limit-- > 0 && hasMore && !cancellationToken.IsCancellationRequested)
515505
{
516-
try
506+
var setEntries = await database.SortedSetPopAsync(
507+
scheduledKey,
508+
1,
509+
Order.Descending);
510+
var readyMessages = new List<RedisValue>();
511+
foreach (var sortedSetEntry in setEntries)
517512
{
518-
// Deserialize the envelope
519-
var envelope = EnvelopeSerializer.Deserialize(serializedEnvelope);
520-
521-
// Add it to the stream
522-
_endpoint.EnvelopeMapper ??= _endpoint.BuildMapper(_runtime);
523-
var fields = new List<NameValueEntry>();
524-
_endpoint.EnvelopeMapper.MapEnvelopeToOutgoing(envelope, fields);
525-
526-
var messageId = await database.StreamAddAsync(_endpoint.StreamKey, fields.ToArray());
527-
528-
// Remove from scheduled set
529-
await database.SortedSetRemoveAsync(scheduledKey, serializedEnvelope);
530-
531-
count++;
532-
533-
_logger.LogDebug("Moved scheduled message {EnvelopeId} (Attempts={Attempts}) to stream {StreamKey} with message ID {MessageId}",
534-
envelope.Id, envelope.Attempts, _endpoint.StreamKey, messageId);
513+
if (sortedSetEntry.Score > now)
514+
{
515+
// Not ready yet, re-add to the sorted set
516+
await database.SortedSetAddAsync(
517+
scheduledKey,
518+
sortedSetEntry.Element,
519+
sortedSetEntry.Score);
520+
hasMore = false;
521+
}
522+
else
523+
{
524+
readyMessages.Add(sortedSetEntry.Element);
525+
}
535526
}
536-
catch (Exception ex)
527+
528+
if (readyMessages.Count == 0)
537529
{
538-
_logger.LogError(ex, "Error processing scheduled message in {ScheduledKey}", scheduledKey);
539-
// Remove the corrupted message from the scheduled set
540-
await database.SortedSetRemoveAsync(scheduledKey, serializedEnvelope);
530+
return 0;
531+
}
532+
533+
_logger.LogDebug(
534+
"Found {Count} scheduled messages ready for execution in {ScheduledKey}",
535+
readyMessages.Count,
536+
scheduledKey);
537+
538+
539+
foreach (var serializedEnvelope in readyMessages)
540+
{
541+
try
542+
{
543+
// Deserialize the envelope
544+
var envelope =
545+
EnvelopeSerializer.Deserialize(serializedEnvelope);
546+
547+
// Add it to the stream
548+
_endpoint.EnvelopeMapper ??=
549+
_endpoint.BuildMapper(_runtime);
550+
var fields = new List<NameValueEntry>();
551+
_endpoint.EnvelopeMapper.MapEnvelopeToOutgoing(
552+
envelope,
553+
fields);
554+
555+
var messageId = await database.StreamAddAsync(
556+
_endpoint.StreamKey,
557+
fields.ToArray());
558+
559+
// Remove from scheduled set
560+
// await database.SortedSetRemoveAsync(scheduledKey, serializedEnvelope);
561+
562+
count++;
563+
564+
_logger.LogDebug(
565+
"Moved scheduled message {EnvelopeId} (Attempts={Attempts}) to stream {StreamKey} with message ID {MessageId}",
566+
envelope.Id,
567+
envelope.Attempts,
568+
_endpoint.StreamKey,
569+
messageId);
570+
}
571+
catch (Exception ex)
572+
{
573+
_logger.LogError(
574+
ex,
575+
"Error processing scheduled message in {ScheduledKey}",
576+
scheduledKey);
577+
// Remove the corrupted message from the scheduled set
578+
await database.SortedSetRemoveAsync(
579+
scheduledKey,
580+
serializedEnvelope);
581+
}
541582
}
542583
}
543584

0 commit comments

Comments
 (0)