Skip to content

Commit eeb4520

Browse files
committed
Support for AMQP protocol over WebSockets
1 parent 6b4a2ac commit eeb4520

10 files changed

Lines changed: 94 additions & 13 deletions

File tree

.github/workflows/build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ jobs:
1616
ARTEMIS_PASSWORD: "artemis"
1717
ARTEMIS_HOST: "localhost"
1818
ARTEMIS_PORT: 5672
19+
ARTEMIS_WS_PORT: 80
1920
steps:
2021
- uses: actions/checkout@v1
2122
- name: Setup .NET Core

src/ArtemisNetClient/ArtemisNetClient.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
2323
</ItemGroup>
2424
<ItemGroup>
25-
<PackageReference Include="AMQPNetLite" Version="2.4.5" />
25+
<PackageReference Include="AMQPNetLite" Version="2.4.6" />
26+
<PackageReference Include="AMQPNetLite.WebSockets" Version="2.4.6" />
2627
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.0" />
2728
<PackageReference Include="Nito.AsyncEx.Coordination" Version="5.1.0" />
2829
<PackageReference Include="Polly" Version="7.2.1" />

src/ArtemisNetClient/Builders/ConnectionBuilder.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ public async Task<IConnection> CreateAsync(Endpoint endpoint, CancellationToken
3434
cancellationToken.ThrowIfCancellationRequested();
3535
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());
3636

37-
var connectionFactory = new Amqp.ConnectionFactory();
37+
var connectionFactory = endpoint.Scheme is Scheme.Ws or Scheme.Wss
38+
? new Amqp.ConnectionFactory(new[] { new WebSocketTransportFactory() })
39+
: new Amqp.ConnectionFactory();
40+
3841
try
3942
{
4043
var open = GetOpenFrame(endpoint);

src/ArtemisNetClient/Configuration/Endpoint.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ public sealed class Endpoint
88
{
99
private const string Amqp = "AMQP";
1010
private const string Amqps = "AMQPS";
11+
private const string Ws = "WS";
12+
private const string Wss = "WSS";
1113

1214
private Endpoint(Address address)
1315
{
@@ -41,13 +43,18 @@ private Endpoint(Address address)
4143
/// </summary>
4244
public string Password => Address.Password;
4345

44-
public static Endpoint Create(string host, int port, string user = null, string password = null, Scheme scheme = Scheme.Amqp)
46+
/// <summary>
47+
/// Gets the path of the endpoint.
48+
/// </summary>
49+
public string Path => Address.Path;
50+
51+
public static Endpoint Create(string host, int port, string user = null, string password = null, Scheme scheme = Scheme.Amqp, string path = "/")
4552
{
4653
var protocolScheme = GetScheme(scheme);
4754

4855
try
4956
{
50-
return new Endpoint(new Address(host, port, user, password, "/", protocolScheme))
57+
return new Endpoint(new Address(host, port, user, password, path, protocolScheme))
5158
{
5259
Scheme = scheme
5360
};
@@ -68,13 +75,16 @@ private static string GetScheme(Scheme scheme)
6875
{
6976
Scheme.Amqp => Amqp,
7077
Scheme.Amqps => Amqps,
78+
Scheme.Ws => Ws,
79+
Scheme.Wss => Wss,
7180
_ => throw new CreateEndpointException($"Protocol scheme {scheme.ToString()} is invalid.", ErrorCode.InvalidField)
7281
};
7382
}
7483

84+
/// <inheritdoc />
7585
public override string ToString()
7686
{
77-
return $@"{Scheme.ToString().ToLower()}://{Host}:{Port.ToString()}";
87+
return $@"{Scheme.ToString().ToLower()}://{Host}:{Port.ToString()}{Path}";
7888
}
7989
}
8090
}
Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,28 @@
11
namespace ActiveMQ.Artemis.Client
22
{
3+
/// <summary>
4+
/// Represents the protocol schemes used for AMQP connections.
5+
/// </summary>
36
public enum Scheme
47
{
8+
/// <summary>
9+
/// Represents the standard AMQP protocol without security.
10+
/// </summary>
511
Amqp,
6-
Amqps
12+
13+
/// <summary>
14+
/// Represents the standard AMQP protocol secured with SSL/TLS.
15+
/// </summary>
16+
Amqps,
17+
18+
/// <summary>
19+
/// Represents the AMQP protocol over WebSocket without security.
20+
/// </summary>
21+
Ws,
22+
23+
/// <summary>
24+
/// Represents the AMQP protocol over WebSocket secured with SSL/TLS.
25+
/// </summary>
26+
Wss
727
}
828
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Xunit;
4+
using Xunit.Abstractions;
5+
6+
namespace ActiveMQ.Artemis.Client.IntegrationTests;
7+
8+
public class WebSocketsSpec : ActiveMQNetIntegrationSpec
9+
{
10+
public WebSocketsSpec(ITestOutputHelper output) : base(output)
11+
{
12+
}
13+
14+
[Fact]
15+
public async Task Should_send_and_receive_message_with_web_socket_endpoint()
16+
{
17+
string userName = Environment.GetEnvironmentVariable("ARTEMIS_USERNAME") ?? "artemis";
18+
string password = Environment.GetEnvironmentVariable("ARTEMIS_PASSWORD") ?? "artemis";
19+
string host = Environment.GetEnvironmentVariable("ARTEMIS_HOST") ?? "localhost";
20+
int port = int.Parse(Environment.GetEnvironmentVariable("ARTEMIS_WS_PORT") ?? "80");
21+
22+
var endpoint = Endpoint.Create(host: host, port: port, user: userName, password: password, Scheme.Ws);
23+
24+
var address = Guid.NewGuid().ToString();
25+
26+
var connectionFactory = new ConnectionFactory();
27+
await using var connection = await connectionFactory.CreateAsync(endpoint, CancellationToken);
28+
await using var consumer = await connection.CreateConsumerAsync(address, RoutingType.Anycast);
29+
await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast);
30+
31+
await producer.SendAsync(new Message("msg"));
32+
33+
var msg = await consumer.ReceiveAsync();
34+
await consumer.AcceptAsync(msg);
35+
36+
Assert.Equal("msg", msg.GetBody<string>());
37+
}
38+
}

test/ArtemisNetClient.UnitTests/EndpointSpec.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,16 @@ public static IEnumerable<object[]> SchemaData()
4646
return new[]
4747
{
4848
new object[] { Scheme.Amqp },
49-
new object[] { Scheme.Amqps }
49+
new object[] { Scheme.Amqps },
50+
new object[] { Scheme.Ws },
51+
new object[] { Scheme.Wss },
5052
};
5153
}
5254

5355
[Fact]
5456
public void Throws_when_invalid_scheme_specified()
5557
{
56-
var exception = Assert.Throws<CreateEndpointException>(() => Endpoint.Create("localhost", 5672, "guest", "guest", (Scheme) 999));
58+
var exception = Assert.Throws<CreateEndpointException>(() => Endpoint.Create("localhost", 5672, "guest", "guest", scheme: (Scheme) 999));
5759
Assert.Equal(ErrorCode.InvalidField, exception.ErrorCode);
5860
}
5961

@@ -67,10 +69,11 @@ public static IEnumerable<object[]> EndpointData()
6769
{
6870
return new[]
6971
{
70-
new object[] { Endpoint.Create("localhost", 5762), "amqp://localhost:5762" },
71-
new object[] { Endpoint.Create("localhost", 5762, scheme: Scheme.Amqps), "amqps://localhost:5762" },
72-
new object[] { Endpoint.Create("localhost", 5762, "admin", password: "secret"), "amqp://localhost:5762" },
73-
new object[] { Endpoint.Create("localhost", 5762, "admin", password: "secret", Scheme.Amqps), "amqps://localhost:5762" }
72+
new object[] { Endpoint.Create("localhost", 5762), "amqp://localhost:5762/" },
73+
new object[] { Endpoint.Create("localhost", 5762, scheme: Scheme.Amqps), "amqps://localhost:5762/" },
74+
new object[] { Endpoint.Create("localhost", 5762, "admin", password: "secret"), "amqp://localhost:5762/" },
75+
new object[] { Endpoint.Create("localhost", 5762, "admin", password: "secret", scheme: Scheme.Amqps), "amqps://localhost:5762/" },
76+
new object[] { Endpoint.Create("localhost", 80, "admin", password: "secret", scheme: Scheme.Wss, path: "/redirectMeToBrokerA"), "wss://localhost:80/redirectMeToBrokerA" },
7477
};
7578
}
7679
}

test/artemis/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ EXPOSE 8161 \
2424
# Port for MQTT
2525
1883 \
2626
#Port for STOMP
27-
61613
27+
61613 \
28+
# Port for WS
29+
80
2830

2931
ENTRYPOINT ["/artemis/amq/bin/artemis", "run"]

test/artemis/broker.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ under the License.
172172
<!-- MQTT Acceptor -->
173173
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
174174

175+
<acceptor name="amqp-ws-acceptor">tcp://0.0.0.0:80?protocols=AMQP</acceptor>
176+
175177
</acceptors>
176178

177179

test/artemis/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ services:
66
ports:
77
- 5672:5672
88
- 8161:8161
9+
- 80:80
910
volumes:
1011
- ./broker.xml:/artemis/amq/etc/broker.xml

0 commit comments

Comments
 (0)