Skip to content

Commit 49f214f

Browse files
authored
feat: Add WebSocket transport support for AMQP 1.0 connections (#78)
* feat: Add WebSocket transport support for AMQP 1.0 * chore: revert changes to ExtractWithoutPassword helper
1 parent 3d1548f commit 49f214f

5 files changed

Lines changed: 156 additions & 10 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/Azure/go-amqp v1.5.1
99
github.com/golang-jwt/jwt/v5 v5.2.2
1010
github.com/google/uuid v1.6.0
11+
github.com/gorilla/websocket v1.5.3
1112
github.com/onsi/ginkgo/v2 v2.23.4
1213
github.com/onsi/gomega v1.38.0
1314
)

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
github.com/Azure/go-amqp v1.4.0 h1:Xj3caqi4comOF/L1Uc5iuBxR/pB6KumejC01YQOqOR4=
2-
github.com/Azure/go-amqp v1.4.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
3-
github.com/Azure/go-amqp v1.5.0 h1:GRiQK1VhrNFbyx5VlmI6BsA1FCp27W5rb9kxOZScnTo=
4-
github.com/Azure/go-amqp v1.5.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
51
github.com/Azure/go-amqp v1.5.1 h1:WyiPTz2C3zVvDL7RLAqwWdeoYhMtX62MZzQoP09fzsU=
62
github.com/Azure/go-amqp v1.5.1/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
73
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -20,6 +16,8 @@ github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J
2016
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
2117
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
2218
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
19+
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
20+
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
2321
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
2422
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
2523
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@ package rabbitmqamqp
33
import (
44
"context"
55
"crypto/tls"
6+
"encoding/base64"
67
"errors"
78
"fmt"
89
"math/rand"
10+
"net/http"
11+
"net/url"
912
"sync"
1013
"sync/atomic"
1114
"time"
1215

1316
"github.com/Azure/go-amqp"
1417
"github.com/google/uuid"
18+
"github.com/gorilla/websocket"
1519
)
1620

1721
var ErrConnectionClosed = errors.New("connection is closed")
@@ -453,10 +457,45 @@ func (a *AmqpConnection) open(ctx context.Context, address string, connOptions *
453457
TLSConfig: connOptions.TLSConfig,
454458
WriteTimeout: connOptions.WriteTimeout,
455459
}
456-
azureConnection, err = amqp.Dial(ctx, address, amqpLiteConnOptions)
457-
if err != nil && (connOptions.TLSConfig != nil || uri.Scheme == AMQPS) {
458-
Error("Failed to open TLS connection", fmt.Sprintf("%s://%s", uri.Scheme, uri.Host), err, "ID", connOptions.Id)
459-
return fmt.Errorf("failed to open TLS connection: %w", err)
460+
461+
u, err := url.Parse(address)
462+
if err != nil {
463+
return err
464+
}
465+
466+
if u.Scheme == "ws" || u.Scheme == "wss" {
467+
468+
wsAddress, wsHeaders, err := sanitizeWebSocketURL(address)
469+
if err != nil {
470+
Error("Failed to sanitize websocket URL", "url", ExtractWithoutPassword(address), "error", err, "ID", connOptions.Id)
471+
return fmt.Errorf("failed to sanitize websocket URL: %w", err)
472+
}
473+
474+
// Create a WebSocket dialer
475+
dialer := websocket.DefaultDialer
476+
if u.Scheme == "wss" && connOptions.TLSConfig != nil {
477+
dialer.TLSClientConfig = connOptions.TLSConfig
478+
}
479+
480+
// Dial the WebSocket server
481+
wsConn, _, err := dialer.Dial(wsAddress, wsHeaders)
482+
if err != nil {
483+
Error("Failed to open a websocket connection", "url", ExtractWithoutPassword(wsAddress), "error", err, "ID", connOptions.Id)
484+
return fmt.Errorf("failed to open a websocket connection: %w", err)
485+
}
486+
487+
// Wrap the WebSocket connection in a WebSocketConn
488+
neConn := NewWebSocketConn(wsConn)
489+
azureConnection, err = amqp.NewConn(ctx, neConn, amqpLiteConnOptions)
490+
if err != nil {
491+
Error("Failed to open AMQP over WebSocket connection", "url", ExtractWithoutPassword(address), "error", err, "ID", connOptions.Id)
492+
}
493+
} else {
494+
azureConnection, err = amqp.Dial(ctx, address, amqpLiteConnOptions)
495+
if err != nil && (connOptions.TLSConfig != nil || uri.Scheme == AMQPS) {
496+
Error("Failed to open TLS connection", fmt.Sprintf("%s://%s", uri.Scheme, uri.Host), err, "ID", connOptions.Id)
497+
return fmt.Errorf("failed to open TLS connection: %w", err)
498+
}
460499
}
461500
if err != nil {
462501
Error("Failed to open connection", "url", ExtractWithoutPassword(address), "error", err, "ID", connOptions.Id)
@@ -701,3 +740,36 @@ func (a *AmqpConnection) RefreshToken(background context.Context, token string)
701740
}
702741

703742
//*** end management section ***
743+
744+
// sanitizeWebSocketURL ensures the URL is correctly formatted for the Gorilla websocket dialer.
745+
func sanitizeWebSocketURL(rawURL string) (string, http.Header, error) {
746+
u, err := url.Parse(rawURL)
747+
if err != nil {
748+
return "", nil, err
749+
}
750+
751+
if u.Scheme != "ws" && u.Scheme != "wss" {
752+
return "", nil, fmt.Errorf("invalid websocket scheme: %s", u.Scheme)
753+
}
754+
755+
// Prepare Headers for Auth
756+
headers := http.Header{}
757+
if u.User != nil {
758+
username := u.User.Username()
759+
password, _ := u.User.Password()
760+
761+
// Construct Basic Auth Header manually
762+
auth := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
763+
headers.Add("Authorization", "Basic "+auth)
764+
765+
u.User = nil
766+
}
767+
768+
if u.Path == "" {
769+
u.Path = "/"
770+
} else if u.Path[0] != '/' {
771+
u.Path = "/" + u.Path
772+
}
773+
774+
return u.String(), headers, nil
775+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package rabbitmqamqp
2+
3+
import (
4+
"io"
5+
"net"
6+
"time"
7+
8+
"github.com/gorilla/websocket"
9+
)
10+
11+
type WebSocketConn struct {
12+
conn *websocket.Conn
13+
reader io.Reader
14+
}
15+
16+
func NewWebSocketConn(conn *websocket.Conn) *WebSocketConn {
17+
return &WebSocketConn{conn: conn}
18+
}
19+
20+
func (ws *WebSocketConn) Read(b []byte) (n int, err error) {
21+
if ws.reader == nil {
22+
messageType, reader, err := ws.conn.NextReader()
23+
if err != nil {
24+
return 0, err
25+
}
26+
if messageType != websocket.BinaryMessage {
27+
return 0, io.ErrUnexpectedEOF
28+
}
29+
ws.reader = reader
30+
}
31+
n, err = ws.reader.Read(b)
32+
if err == io.EOF {
33+
ws.reader = nil
34+
err = nil
35+
}
36+
return n, err
37+
}
38+
39+
func (ws *WebSocketConn) Write(b []byte) (n int, err error) {
40+
err = ws.conn.WriteMessage(websocket.BinaryMessage, b)
41+
if err != nil {
42+
return 0, err
43+
}
44+
45+
return len(b), nil
46+
}
47+
48+
func (ws *WebSocketConn) Close() error {
49+
return ws.conn.Close()
50+
}
51+
52+
func (ws *WebSocketConn) LocalAddr() net.Addr {
53+
return ws.conn.LocalAddr()
54+
}
55+
56+
func (ws *WebSocketConn) RemoteAddr() net.Addr {
57+
return ws.conn.RemoteAddr()
58+
}
59+
60+
func (ws *WebSocketConn) SetDeadline(t time.Time) error {
61+
err := ws.conn.SetReadDeadline(t)
62+
if err != nil {
63+
return err
64+
}
65+
return ws.conn.SetWriteDeadline(t)
66+
}
67+
68+
func (ws *WebSocketConn) SetReadDeadline(t time.Time) error {
69+
return ws.conn.SetReadDeadline(t)
70+
}
71+
72+
func (ws *WebSocketConn) SetWriteDeadline(t time.Time) error {
73+
return ws.conn.SetWriteDeadline(t)
74+
}

pkg/rabbitmqamqp/uri.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ import (
1313
)
1414

1515
var (
16-
errURIScheme = errors.New("AMQP scheme must be either 'amqp://' or 'amqps://'")
16+
errURIScheme = errors.New("AMQP scheme must be either 'amqp://', 'amqps://', 'ws://' or 'wss://'")
1717
errURIWhitespace = errors.New("URI must not contain whitespace")
1818
)
1919

2020
var schemePorts = map[string]int{
2121
"amqp": 5672,
2222
"amqps": 5671,
23+
"wss": 15675,
24+
"ws": 15675,
2325
}
2426

2527
var defaultURI = URI{
@@ -117,7 +119,6 @@ func ParseURI(uri string) (URI, error) {
117119
}
118120

119121
// Extract the Uri by omitting the password
120-
121122
func ExtractWithoutPassword(addr string) string {
122123
u, err := ParseURI(addr)
123124
if err != nil {

0 commit comments

Comments
 (0)