Skip to content

Commit de35e5d

Browse files
committed
Remove message source when link detached
1 parent adf524a commit de35e5d

3 files changed

Lines changed: 45 additions & 2 deletions

File tree

src/ArtemisNetClient.Testing/SharedMessageSource.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,25 @@ public void AddMessageSource(MessageSource messageSource)
2323
_messageSources.Add(messageSource);
2424
}
2525

26+
public void RemoveMessageSource(MessageSource messageSource)
27+
{
28+
_messageSources.Remove(messageSource);
29+
}
30+
31+
public int MessageSourcesCount => _messageSources.Count;
32+
2633
public void Enqueue(Message message)
2734
{
2835
if (Info.FilterExpression != null && FilterExpressionMatches(message) == false)
2936
{
3037
return;
3138
}
3239

40+
if (_messageSources.Count == 0)
41+
{
42+
return;
43+
}
44+
3345
if (message.GroupId is { Length: > 0 } groupId)
3446
{
3547
var cursor = ArtemisBucketHelper.GetBucket(groupId, _messageSources.Count);

src/ArtemisNetClient.Testing/TestKit.cs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public TestKit(Endpoint endpoint)
1515
_host = new ContainerHost(endpoint.Address);
1616
_host.AddressResolver = AddressResolver;
1717
_host.Listeners[0].HandlerFactory = _ => new Handler();
18-
_host.RegisterLinkProcessor(new TestLinkProcessor(OnMessage, OnMessageSource));
18+
_host.RegisterLinkProcessor(new TestLinkProcessor(OnMessage, OnMessageSource, OnMessageSourceClosed));
1919
_host.Open();
2020
}
2121

@@ -91,6 +91,29 @@ private void OnMessageSource(MessageSourceInfo info, MessageSource messageSource
9191
}
9292
}
9393

94+
private void OnMessageSourceClosed(MessageSourceInfo info, MessageSource messageSource)
95+
{
96+
if (!_messageSources.TryGetValue(info.Address, out var messageSources))
97+
{
98+
return;
99+
}
100+
101+
lock (messageSources)
102+
{
103+
var existingMessageSource = messageSources.FirstOrDefault(x => x.Info == info);
104+
if (existingMessageSource == null)
105+
{
106+
return;
107+
}
108+
109+
existingMessageSource.RemoveMessageSource(messageSource);
110+
if (messageSources.Count == 0)
111+
{
112+
messageSources.Remove(existingMessageSource);
113+
}
114+
}
115+
}
116+
94117
public ISubscription Subscribe(string address)
95118
{
96119
var subscription = new Subscription();

src/ArtemisNetClient.Testing/TestLinkProcessor.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ internal class TestLinkProcessor : ILinkProcessor
1212
{
1313
private readonly Action<Message> _onMessage;
1414
private readonly Action<MessageSourceInfo, MessageSource> _onMessageSource;
15+
private readonly Action<MessageSourceInfo, MessageSource> _onMessageSourceClosed;
1516

16-
public TestLinkProcessor(Action<Message> onMessage, Action<MessageSourceInfo, MessageSource> onMessageSource)
17+
public TestLinkProcessor(Action<Message> onMessage,
18+
Action<MessageSourceInfo, MessageSource> onMessageSource,
19+
Action<MessageSourceInfo, MessageSource> onMessageSourceClosed)
1720
{
1821
_onMessage = onMessage;
1922
_onMessageSource = onMessageSource;
23+
_onMessageSourceClosed = onMessageSourceClosed;
2024
}
2125

2226
public void Process(AttachContext attachContext)
@@ -35,6 +39,10 @@ public void Process(AttachContext attachContext)
3539

3640
// override OnDispose so it won't throw NRE when message is null
3741
attachContext.Link.SetOnDispose((_, _, _, _) => { });
42+
attachContext.Link.AddSafeClosed((_, _) =>
43+
{
44+
_onMessageSourceClosed(messageSourceInfo, messageSource);
45+
});
3846

3947
attachContext.Link.CompleteAttach(attachContext.Attach, null);
4048
}

0 commit comments

Comments
 (0)