Skip to content

Commit af263af

Browse files
Taiizorclaude
andcommitted
feat(relay): raise MessageExpired from the cleanup sweep
Previously MessageExpired only fired from the delivery path, so messages that expired while waiting in the queue and were removed by the background cleanup sweep were dropped without notification. - Change IRelayQueue.ClearExpiredAsync to return the newly-expired messages (IReadOnlyList<IRelayMessage>) instead of a count. Messages already in the Expired state (already reported via the delivery path) are removed but not returned, so each message is reported exactly once. - Add RelayService.SweepExpiredMessagesAsync, which raises MessageExpired for each returned message; the cleanup loop now calls it. - Update InMemoryRelayQueue, the QueueManagementExample and the relay README. - Add tests for the sweep raising events and the empty-sweep case (12 total). BREAKING CHANGE: IRelayQueue.ClearExpiredAsync now returns IReadOnlyList<IRelayMessage> instead of int. Custom IRelayQueue implementations and callers must be updated. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 92f2fd5 commit af263af

6 files changed

Lines changed: 90 additions & 16 deletions

File tree

examples/Zetian.Relay.Examples/QueueManagementExample.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,8 +401,8 @@ private static async Task RescheduleMessage(Zetian.Relay.Abstractions.IRelayQueu
401401

402402
private static async Task ClearExpiredMessages(Zetian.Relay.Abstractions.IRelayQueue queue)
403403
{
404-
int count = await queue.ClearExpiredAsync();
405-
Console.WriteLine($"[INFO] Cleared {count} expired messages");
404+
var expired = await queue.ClearExpiredAsync();
405+
Console.WriteLine($"[INFO] Cleared {expired.Count} expired messages");
406406
}
407407

408408
private static async Task SendMoreMessages(SmtpClient client)

src/Zetian.Relay/Abstractions/IRelayQueue.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,12 @@ Task<IReadOnlyList<IRelayMessage>> GetByStatusAsync(
9494
CancellationToken cancellationToken = default);
9595

9696
/// <summary>
97-
/// Clears expired messages from the queue
97+
/// Removes expired messages from the queue and returns the messages that newly
98+
/// transitioned to the expired state during this call. Messages that were already
99+
/// marked as expired (e.g. reported via the delivery path) are removed but not
100+
/// returned, so callers can raise an expiry notification exactly once per message.
98101
/// </summary>
99-
Task<int> ClearExpiredAsync(CancellationToken cancellationToken = default);
102+
Task<IReadOnlyList<IRelayMessage>> ClearExpiredAsync(CancellationToken cancellationToken = default);
100103

101104
/// <summary>
102105
/// Gets queue statistics

src/Zetian.Relay/Queue/InMemoryRelayQueue.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ public Task<IReadOnlyList<IRelayMessage>> GetByStatusAsync(
306306
return Task.FromResult<IReadOnlyList<IRelayMessage>>(messages);
307307
}
308308

309-
public async Task<int> ClearExpiredAsync(CancellationToken cancellationToken = default)
309+
public async Task<IReadOnlyList<IRelayMessage>> ClearExpiredAsync(CancellationToken cancellationToken = default)
310310
{
311311
await _queueLock.WaitAsync(cancellationToken).ConfigureAwait(false);
312312
try
@@ -315,15 +315,26 @@ public async Task<int> ClearExpiredAsync(CancellationToken cancellationToken = d
315315
.Where(m => m.IsExpired)
316316
.ToList();
317317

318+
List<IRelayMessage> newlyExpired = [];
319+
318320
foreach (RelayMessage message in expired)
319321
{
322+
// Messages already in the Expired state were reported via the delivery
323+
// path; remove them but do not surface them again to avoid double-firing.
324+
bool alreadyExpired = message.Status == RelayStatus.Expired;
325+
320326
if (_messages.TryRemove(message.QueueId, out _))
321327
{
322328
if (message.Status == RelayStatus.InProgress)
323329
{
324330
Interlocked.Decrement(ref _activeDeliveries);
325331
}
326-
message.MarkExpired();
332+
333+
if (!alreadyExpired)
334+
{
335+
message.MarkExpired();
336+
newlyExpired.Add(message);
337+
}
327338
}
328339
}
329340

@@ -332,7 +343,7 @@ public async Task<int> ClearExpiredAsync(CancellationToken cancellationToken = d
332343
_logger.LogInformation("Cleared {Count} expired messages from queue", expired.Count);
333344
}
334345

335-
return expired.Count;
346+
return newlyExpired;
336347
}
337348
finally
338349
{

src/Zetian.Relay/README.MD

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,9 @@ var deferredMessages = await relayService.Queue.GetByStatusAsync(RelayStatus.Def
236236
// Remove a message
237237
await relayService.Queue.RemoveAsync(queueId);
238238

239-
// Clear expired messages
240-
var cleared = await relayService.Queue.ClearExpiredAsync();
239+
// Clear expired messages (returns the messages that newly expired)
240+
var expired = await relayService.Queue.ClearExpiredAsync();
241+
Console.WriteLine($"Cleared {expired.Count} expired messages");
241242
```
242243

243244
### Message Priority

src/Zetian.Relay/Services/RelayService.cs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -384,13 +384,7 @@ private async Task CleanupExpiredMessagesAsync(CancellationToken cancellationTok
384384
await Task.Delay(Configuration.CleanupInterval, cancellationToken)
385385
.ConfigureAwait(false);
386386

387-
int count = await Queue.ClearExpiredAsync(cancellationToken)
388-
.ConfigureAwait(false);
389-
390-
if (count > 0)
391-
{
392-
_logger.LogInformation("Cleaned up {Count} expired messages", count);
393-
}
387+
await SweepExpiredMessagesAsync(cancellationToken).ConfigureAwait(false);
394388
}
395389
catch (OperationCanceledException)
396390
{
@@ -405,6 +399,29 @@ await Task.Delay(Configuration.CleanupInterval, cancellationToken)
405399
_logger.LogInformation("Cleanup task stopped");
406400
}
407401

402+
/// <summary>
403+
/// Performs a single sweep of the queue for expired messages, raising
404+
/// <see cref="MessageExpired"/> for each message that expired while waiting in the
405+
/// queue (i.e. without being picked up by the delivery path, which reports expiry itself).
406+
/// </summary>
407+
internal async Task SweepExpiredMessagesAsync(CancellationToken cancellationToken)
408+
{
409+
IReadOnlyList<IRelayMessage> expiredMessages = await Queue.ClearExpiredAsync(cancellationToken)
410+
.ConfigureAwait(false);
411+
412+
if (expiredMessages.Count == 0)
413+
{
414+
return;
415+
}
416+
417+
_logger.LogInformation("Cleaned up {Count} expired messages", expiredMessages.Count);
418+
419+
foreach (IRelayMessage expiredMessage in expiredMessages)
420+
{
421+
OnMessageExpired(new RelayDeliveryEventArgs(expiredMessage) { Error = "Message expired" });
422+
}
423+
}
424+
408425
private async Task<bool> CanRelayAsync(ISmtpSession session, ISmtpMessage message)
409426
{
410427
// Allow relay if authenticated

tests/Zetian.Relay.Tests/RelayServiceEventTests.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Net.Mail;
45
using System.Threading;
56
using System.Threading.Tasks;
@@ -275,6 +276,47 @@ public async Task MessageExpired_IsRaised_WhenMessageHasExpired()
275276
client.Verify(c => c.SendAsync(It.IsAny<ISmtpMessage>(), It.IsAny<CancellationToken>()), Times.Never);
276277
}
277278

279+
[Fact]
280+
public async Task MessageExpired_IsRaised_FromCleanupSweep_PerReturnedMessage()
281+
{
282+
Mock<IRelayMessage> first = new();
283+
first.SetupGet(m => m.QueueId).Returns("queue-1");
284+
Mock<IRelayMessage> second = new();
285+
second.SetupGet(m => m.QueueId).Returns("queue-2");
286+
287+
Mock<IRelayQueue> queue = new();
288+
queue.Setup(q => q.ClearExpiredAsync(It.IsAny<CancellationToken>()))
289+
.ReturnsAsync(new List<IRelayMessage> { first.Object, second.Object });
290+
291+
RelayService service = new(CreateConfiguration(), queue.Object, logger: null);
292+
293+
List<RelayDeliveryEventArgs> raised = new();
294+
service.MessageExpired += (_, e) => raised.Add(e);
295+
296+
await service.SweepExpiredMessagesAsync(CancellationToken.None);
297+
298+
Assert.Equal(2, raised.Count);
299+
Assert.Equal(new[] { "queue-1", "queue-2" }, raised.Select(r => r.QueueId).ToArray());
300+
Assert.All(raised, r => Assert.Equal("Message expired", r.Error));
301+
}
302+
303+
[Fact]
304+
public async Task MessageExpired_NotRaised_WhenSweepFindsNothing()
305+
{
306+
Mock<IRelayQueue> queue = new();
307+
queue.Setup(q => q.ClearExpiredAsync(It.IsAny<CancellationToken>()))
308+
.ReturnsAsync(Array.Empty<IRelayMessage>());
309+
310+
RelayService service = new(CreateConfiguration(), queue.Object, logger: null);
311+
312+
bool raised = false;
313+
service.MessageExpired += (_, _) => raised = true;
314+
315+
await service.SweepExpiredMessagesAsync(CancellationToken.None);
316+
317+
Assert.False(raised);
318+
}
319+
278320
[Fact]
279321
public async Task EventHandlerException_DoesNotPropagate()
280322
{

0 commit comments

Comments
 (0)