Skip to content

Commit d21f0c8

Browse files
authored
Add Sec-WebSocket-Protocol to the HTTP header (#79)
This pull request adds support for the AMQP WebSocket subprotocol by including the `Sec-WebSocket-Protocol` header in WebSocket connections, and provides a complete example demonstrating AMQP 1.0 over WebSockets. **Changes:** - Added `Sec-WebSocket-Protocol: amqp` header to WebSocket connection initialization - Created a comprehensive WebSocket example with queue declaration, message publishing, and consumption - Added documentation for setting up and running the WebSocket example Complete: #78 --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 49f214f commit d21f0c8

4 files changed

Lines changed: 114 additions & 2 deletions

File tree

docs/examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@
1111
- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client.
1212
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
1313
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
14-
- [SQL stream Filtering](sql_stream_filter) - An example of how to use SQL stream filtering with RabbitMQ Streams.
14+
- [SQL stream Filtering](sql_stream_filter) - An example of how to use SQL stream filtering with RabbitMQ Streams.
15+
- [Web Sockets](web_sockets) - An example of how to use Web Sockets with the AMQP 1.0 client.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
AMQP 1.0 over WebSocket Example
2+
===============================================================
3+
4+
This example demonstrates how to use AMQP 1.0 over WebSocket. </br>
5+
## RabbitMQ Tanzu
6+
You need [Tanzu RabbitMQ 4.0](https://www.vmware.com/products/app-platform/tanzu-rabbitmq) or later with the AMQP 1.0 and `rabbitmq_web_amqp` plugins enabled.
7+
8+
For more info read the blog post: [AMQP 1.0 over WebSocket](https://www.rabbitmq.com/blog/2025/04/16/amqp-websocket)
9+
10+
To run the example you need to have:
11+
- Tanzu RabbitMQ 4.0 or later with the AMQP 1.0 and `rabbitmq_web_amqp` plugins enabled.
12+
- A vhost called `ws` configured for WebSocket connections.
13+
- A user `rabbit` pwd `rabbit` with access to the `ws` vhost.
14+
15+
## Web Sockify
16+
It is possible to run the example with [websockify](https://github.com/novnc/websockify)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
2+
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
3+
// The example is demonstrating how to connect to RabbitMQ using AMQP 1.0 over WebSocket protocol with SASLTypePlain,
4+
// declare a queue, publish a message to it, and then consume that message.
5+
// AMQP 1.0 over WebSocket documentation: https://www.rabbitmq.com/blog/2025/04/16/amqp-websocket
6+
// example path: https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/web_sockets/web_sockets.go
7+
8+
package main
9+
10+
import (
11+
"context"
12+
13+
"github.com/Azure/go-amqp"
14+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
15+
)
16+
17+
func main() {
18+
const amqpConnectionString = "ws://127.0.0.1:15678/ws"
19+
rmq.Info("[Example]", "Starting web socket connection to", amqpConnectionString)
20+
// for anonymous connection use:
21+
// env := rmq.NewEnvironment(amqpConnectionString, &rmq.AmqpConnOptions{
22+
// SASLType: amqp.SASLTypeAnonymous(),
23+
// })
24+
env := rmq.NewEnvironment(amqpConnectionString, &rmq.AmqpConnOptions{
25+
SASLType: amqp.SASLTypePlain("rabbit", "rabbit"),
26+
})
27+
conn, err := env.NewConnection(context.Background())
28+
if err != nil {
29+
panic(err)
30+
}
31+
_, err = conn.Management().DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{
32+
Name: "test-ws-queue",
33+
})
34+
if err != nil {
35+
panic(err)
36+
}
37+
// declare new producer
38+
producer, err := conn.NewPublisher(context.TODO(), &rmq.QueueAddress{
39+
Queue: "test-ws-queue",
40+
}, nil)
41+
if err != nil {
42+
panic(err)
43+
}
44+
msg := rmq.NewMessage([]byte("Hello over WebSockets"))
45+
46+
publishResult, err := producer.Publish(context.Background(), msg)
47+
if err != nil {
48+
panic(err)
49+
}
50+
switch publishResult.Outcome.(type) {
51+
case *rmq.StateAccepted:
52+
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
53+
default:
54+
rmq.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0])
55+
}
56+
57+
// declare new consumer
58+
consumer, err := conn.NewConsumer(context.TODO(), "test-ws-queue", nil)
59+
if err != nil {
60+
panic(err)
61+
}
62+
deliveryContext, err := consumer.Receive(context.Background())
63+
if err != nil {
64+
panic(err)
65+
}
66+
rmq.Info("[Consumer]", "Message received", string(deliveryContext.Message().GetData()))
67+
err = deliveryContext.Accept(context.Background())
68+
if err != nil {
69+
panic(err)
70+
}
71+
// clean up
72+
err = consumer.Close(context.TODO())
73+
if err != nil {
74+
panic(err)
75+
}
76+
err = producer.Close(context.TODO())
77+
if err != nil {
78+
panic(err)
79+
}
80+
81+
err = conn.Management().DeleteQueue(context.Background(), "test-ws-queue")
82+
if err != nil {
83+
panic(err)
84+
}
85+
86+
err = conn.Close(context.TODO())
87+
if err != nil {
88+
panic(err)
89+
}
90+
91+
}

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,14 +754,18 @@ func sanitizeWebSocketURL(rawURL string) (string, http.Header, error) {
754754

755755
// Prepare Headers for Auth
756756
headers := http.Header{}
757+
// add sec-websocket-protocol amqp
758+
// mandatory for AMQP over WebSocket
759+
// https://www.rfc-editor.org/rfc/rfc6455.html
760+
headers.Add("Sec-WebSocket-Protocol", "amqp")
761+
757762
if u.User != nil {
758763
username := u.User.Username()
759764
password, _ := u.User.Password()
760765

761766
// Construct Basic Auth Header manually
762767
auth := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
763768
headers.Add("Authorization", "Basic "+auth)
764-
765769
u.User = nil
766770
}
767771

0 commit comments

Comments
 (0)