Skip to content

Commit 2873f9b

Browse files
committed
Add option to override client-id
1 parent 778aa83 commit 2873f9b

4 files changed

Lines changed: 76 additions & 6 deletions

File tree

src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ internal class AutoRecoveringConnection : IConnection
2121
private IConnection _connection;
2222
private readonly ILoggerFactory _loggerFactory;
2323
private readonly Func<IMessageIdPolicy> _messageIdPolicyFactory;
24+
private readonly Func<string> _clientIdFactory;
2425
private readonly ILogger<AutoRecoveringConnection> _logger;
2526
private readonly Endpoint[] _endpoints;
2627
private readonly ChannelReader<ConnectCommand> _reader;
@@ -30,11 +31,16 @@ internal class AutoRecoveringConnection : IConnection
3031
private readonly AsyncRetryPolicy<IConnection> _connectionRetryPolicy;
3132
private readonly Task _recoveryLoopTask;
3233

33-
public AutoRecoveringConnection(ILoggerFactory loggerFactory, IEnumerable<Endpoint> endpoints, IRecoveryPolicy recoveryPolicy, Func<IMessageIdPolicy> messageIdPolicyFactory)
34+
public AutoRecoveringConnection(ILoggerFactory loggerFactory,
35+
IEnumerable<Endpoint> endpoints,
36+
IRecoveryPolicy recoveryPolicy,
37+
Func<IMessageIdPolicy> messageIdPolicyFactory,
38+
Func<string> clientIdFactory)
3439
{
3540
_logger = loggerFactory.CreateLogger<AutoRecoveringConnection>();
3641
_loggerFactory = loggerFactory;
3742
_messageIdPolicyFactory = messageIdPolicyFactory;
43+
_clientIdFactory = clientIdFactory;
3844
_endpoints = endpoints.ToArray();
3945

4046
var channel = Channel.CreateUnbounded<ConnectCommand>();
@@ -169,7 +175,7 @@ private Task<IConnection> CreateConnection(CancellationToken cancellationToken)
169175
int retryCount = context.GetRetryCount();
170176
var endpoint = GetNextEndpoint(retryCount);
171177
context.SetEndpoint(endpoint);
172-
var connectionBuilder = new ConnectionBuilder(_loggerFactory, _messageIdPolicyFactory);
178+
var connectionBuilder = new ConnectionBuilder(_loggerFactory, _messageIdPolicyFactory, _clientIdFactory);
173179
var connection = await connectionBuilder.CreateAsync(endpoint, ct);
174180

175181
if (retryCount > 0)

src/ArtemisNetClient/Builders/ConnectionBuilder.cs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,21 @@ namespace ActiveMQ.Artemis.Client.Builders
1111
{
1212
internal class ConnectionBuilder
1313
{
14+
const uint DefaultMaxFrameSize = 256 * 1024;
15+
const ushort ChannelMax = 255;
16+
1417
private readonly ILoggerFactory _loggerFactory;
1518
private readonly Func<IMessageIdPolicy> _messageIdPolicyFactory;
19+
private readonly Func<string> _clientIdFactory;
1620
private readonly TaskCompletionSource<bool> _tcs;
1721

18-
public ConnectionBuilder(ILoggerFactory loggerFactory, Func<IMessageIdPolicy> messageIdPolicyFactory)
22+
public ConnectionBuilder(ILoggerFactory loggerFactory,
23+
Func<IMessageIdPolicy> messageIdPolicyFactory,
24+
Func<string> clientIdFactory)
1925
{
2026
_loggerFactory = loggerFactory;
2127
_messageIdPolicyFactory = messageIdPolicyFactory;
28+
_clientIdFactory = clientIdFactory;
2229
_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
2330
}
2431

@@ -30,7 +37,8 @@ public async Task<IConnection> CreateAsync(Endpoint endpoint, CancellationToken
3037
var connectionFactory = new Amqp.ConnectionFactory();
3138
try
3239
{
33-
var connection = await connectionFactory.CreateAsync(endpoint.Address, null, OnOpened).ConfigureAwait(false);
40+
var open = GetOpenFrame(endpoint);
41+
var connection = await connectionFactory.CreateAsync(endpoint.Address, open, OnOpened).ConfigureAwait(false);
3442
connection.AddClosedCallback(OnClosed);
3543
await _tcs.Task.ConfigureAwait(false);
3644
connection.Closed -= OnClosed;
@@ -42,6 +50,22 @@ public async Task<IConnection> CreateAsync(Endpoint endpoint, CancellationToken
4250
}
4351
}
4452

53+
private Open GetOpenFrame(Endpoint endpoint)
54+
{
55+
if (_clientIdFactory != null)
56+
{
57+
return new Open
58+
{
59+
ContainerId = _clientIdFactory(),
60+
HostName = endpoint.Host,
61+
MaxFrameSize = DefaultMaxFrameSize,
62+
ChannelMax = ChannelMax
63+
};
64+
}
65+
66+
return null;
67+
}
68+
4569
private void OnOpened(Amqp.IConnection connection, Open open)
4670
{
4771
if (connection != null)

src/ArtemisNetClient/ConnectionFactory.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class ConnectionFactory
1717
{
1818
private IRecoveryPolicy _recoveryPolicy;
1919
private Func<IMessageIdPolicy> _messageIdPolicyFactory;
20+
private Func<string> _clientIdFactory;
2021

2122
public async Task<IConnection> CreateAsync(IEnumerable<Endpoint> endpoints, CancellationToken cancellationToken)
2223
{
@@ -29,13 +30,13 @@ public async Task<IConnection> CreateAsync(IEnumerable<Endpoint> endpoints, Canc
2930

3031
if (AutomaticRecoveryEnabled)
3132
{
32-
var autoRecoveringConnection = new AutoRecoveringConnection(LoggerFactory, endpointsList, RecoveryPolicy, MessageIdPolicyFactory);
33+
var autoRecoveringConnection = new AutoRecoveringConnection(LoggerFactory, endpointsList, RecoveryPolicy, MessageIdPolicyFactory, ClientIdFactory);
3334
await autoRecoveringConnection.InitAsync(cancellationToken).ConfigureAwait(false);
3435
return autoRecoveringConnection;
3536
}
3637
else
3738
{
38-
var connectionBuilder = new ConnectionBuilder(LoggerFactory, MessageIdPolicyFactory);
39+
var connectionBuilder = new ConnectionBuilder(LoggerFactory, MessageIdPolicyFactory, ClientIdFactory);
3940
return await connectionBuilder.CreateAsync(endpointsList.First(), cancellationToken).ConfigureAwait(false);
4041
}
4142
}
@@ -53,5 +54,14 @@ public Func<IMessageIdPolicy> MessageIdPolicyFactory
5354
get => _messageIdPolicyFactory ?? ActiveMQ.Artemis.Client.MessageIdPolicy.MessageIdPolicyFactory.DisableMessageIdPolicy;
5455
set => _messageIdPolicyFactory = value ?? throw new ArgumentNullException(nameof(value), "MessageId Policy Factory cannot be null.");
5556
}
57+
58+
/// <summary>
59+
/// Factory function that creates unique identifier for the connection.
60+
/// </summary>
61+
public Func<string> ClientIdFactory
62+
{
63+
get => _clientIdFactory;
64+
set => _clientIdFactory = value ?? throw new ArgumentNullException(nameof(value), "Client Id Factory cannot be null.");
65+
}
5666
}
5767
}

test/ArtemisNetClient.UnitTests/ConnectionFactorySpec.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using ActiveMQ.Artemis.Client.Exceptions;
88
using ActiveMQ.Artemis.Client.MessageIdPolicy;
99
using ActiveMQ.Artemis.Client.UnitTests.Utils;
10+
using Amqp.Framing;
1011
using Amqp.Handler;
1112
using Xunit;
1213
using Xunit.Abstractions;
@@ -108,5 +109,34 @@ public void Should_assign_custom_message_id_policy_factory()
108109
var connectionFactory = CreateConnectionFactory();
109110
connectionFactory.MessageIdPolicyFactory = MessageIdPolicyFactory.GuidMessageIdPolicy;
110111
}
112+
113+
[Fact]
114+
public async Task Should_create_connection_with_specified_client_id()
115+
{
116+
// Arrange
117+
var endpoint = GetUniqueEndpoint();
118+
119+
var tcs = new TaskCompletionSource<Open>();
120+
var handler = new TestHandler(@event =>
121+
{
122+
switch (@event.Id)
123+
{
124+
case EventId.ConnectionRemoteOpen:
125+
tcs.TrySetResult((Open) @event.Context);
126+
break;
127+
}
128+
});
129+
using var host = CreateOpenedContainerHost(endpoint, handler);
130+
131+
var connectionFactory = CreateConnectionFactory();
132+
connectionFactory.ClientIdFactory = () => "foo";
133+
134+
// Act
135+
await using var connection = await connectionFactory.CreateAsync(endpoint);
136+
137+
// Assert
138+
var openFrame = await tcs.Task;
139+
Assert.Equal("foo", openFrame.ContainerId);
140+
}
111141
}
112142
}

0 commit comments

Comments
 (0)