Skip to content

Commit 89b9bed

Browse files
donowuclaude
andcommitted
fix: resolve 6 critical/high-priority concurrency and performance issues
- Fix WebSocket goroutine leaks by adding per-connection context cancellation - Fix race conditions in connection state management (set conn to nil after close) - Fix subscription panic risks by removing panic recovery and adding grace period - Add context cancellation support to stream functions for proper cleanup - Fix heartbeat goroutine management to prevent accumulation - Optimize rate limiter with timestamp-based implementation (eliminate background goroutine) Test coverage: Added comprehensive tests for goroutine leaks, race conditions, and subscription panics. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent a9a3cc8 commit 89b9bed

10 files changed

Lines changed: 1083 additions & 65 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ require (
3535
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe // indirect
3636
github.com/tklauser/go-sysconf v0.3.12 // indirect
3737
github.com/tklauser/numcpus v0.6.1 // indirect
38+
go.uber.org/goleak v1.3.0 // indirect
3839
golang.org/x/crypto v0.45.0 // indirect
3940
golang.org/x/sync v0.12.0 // indirect
4041
golang.org/x/sys v0.38.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
195195
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
196196
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
197197
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
198+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
199+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
198200
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
199201
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
200202
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=

pkg/clob/impl.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,18 @@ func (c *clientImpl) startHeartbeats() {
376376
if c.httpClient == nil || c.signer == nil || c.apiKey == nil || c.heartbeat == nil {
377377
return
378378
}
379+
380+
// Stop old heartbeat goroutine if it exists
379381
if c.heartbeatStop != nil {
380-
close(c.heartbeatStop)
382+
oldStop := c.heartbeatStop
383+
c.heartbeatStop = nil
384+
close(oldStop)
385+
// Add a small delay to allow old goroutine to exit
386+
c.heartbeatMu.Unlock()
387+
time.Sleep(50 * time.Millisecond)
388+
c.heartbeatMu.Lock()
381389
}
390+
382391
stop := make(chan struct{})
383392
c.heartbeatStop = stop
384393
interval := c.heartbeatInterval

pkg/clob/stream.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,35 @@ func StreamDataWithCursor[T any](ctx context.Context, cursor string, fetch Strea
3232
}
3333

3434
for cursor != clobtypes.EndCursor {
35+
// Check context before each fetch operation
3536
if err := ctx.Err(); err != nil {
3637
out <- StreamResult[T]{Err: err}
3738
return
3839
}
40+
41+
// Make fetch operation cancellable by passing context
3942
items, next, err := fetch(ctx, cursor)
4043
if err != nil {
4144
out <- StreamResult[T]{Err: err}
4245
return
4346
}
47+
4448
for _, item := range items {
49+
// Check context before sending each item
4550
if err := ctx.Err(); err != nil {
4651
out <- StreamResult[T]{Err: err}
4752
return
4853
}
49-
out <- StreamResult[T]{Item: item}
54+
55+
// Use select to make send cancellable
56+
select {
57+
case out <- StreamResult[T]{Item: item}:
58+
case <-ctx.Done():
59+
out <- StreamResult[T]{Err: ctx.Err()}
60+
return
61+
}
5062
}
63+
5164
if next == "" || next == cursor {
5265
return
5366
}

pkg/clob/ws/goroutine_leak_test.go

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
package ws
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"net/http/httptest"
7+
"strings"
8+
"testing"
9+
"time"
10+
11+
"github.com/gorilla/websocket"
12+
"go.uber.org/goleak"
13+
)
14+
15+
// TestWebSocketGoroutineLeaks tests that goroutines are properly cleaned up
16+
// during reconnection scenarios to prevent goroutine leaks.
17+
func TestWebSocketGoroutineLeaks_Reconnection(t *testing.T) {
18+
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
19+
20+
// Create a mock WebSocket server that closes connections immediately
21+
upgrader := websocket.Upgrader{}
22+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
23+
conn, err := upgrader.Upgrade(w, r, nil)
24+
if err != nil {
25+
return
26+
}
27+
// Close immediately to trigger reconnection
28+
conn.Close()
29+
}))
30+
defer server.Close()
31+
32+
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
33+
34+
client, err := NewClient(wsURL, nil, nil)
35+
if err != nil {
36+
t.Fatalf("failed to create client: %v", err)
37+
}
38+
39+
// Set short timeouts to speed up test
40+
impl := client.(*clientImpl)
41+
impl.reconnectDelay = 10 * time.Millisecond
42+
impl.reconnectMaxDelay = 50 * time.Millisecond
43+
impl.reconnectMax = 2
44+
impl.setReadTimeout(100 * time.Millisecond)
45+
46+
// Wait for reconnection attempts
47+
time.Sleep(200 * time.Millisecond)
48+
49+
// Close the client
50+
if err := client.Close(); err != nil {
51+
t.Fatalf("failed to close client: %v", err)
52+
}
53+
54+
// Give goroutines time to exit
55+
time.Sleep(100 * time.Millisecond)
56+
57+
// goleak.VerifyNone will check for leaked goroutines at test end
58+
}
59+
60+
// TestWebSocketGoroutineLeaks_MultipleReconnections tests that multiple
61+
// reconnection cycles don't accumulate goroutines.
62+
func TestWebSocketGoroutineLeaks_MultipleReconnections(t *testing.T) {
63+
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
64+
65+
connectionCount := 0
66+
upgrader := websocket.Upgrader{}
67+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
68+
conn, err := upgrader.Upgrade(w, r, nil)
69+
if err != nil {
70+
return
71+
}
72+
connectionCount++
73+
// Close after a short delay
74+
time.AfterFunc(20*time.Millisecond, func() {
75+
conn.Close()
76+
})
77+
}))
78+
defer server.Close()
79+
80+
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
81+
82+
client, err := NewClient(wsURL, nil, nil)
83+
if err != nil {
84+
t.Fatalf("failed to create client: %v", err)
85+
}
86+
87+
impl := client.(*clientImpl)
88+
impl.reconnectDelay = 10 * time.Millisecond
89+
impl.reconnectMaxDelay = 50 * time.Millisecond
90+
impl.reconnectMax = 5
91+
impl.setReadTimeout(50 * time.Millisecond)
92+
93+
// Wait for multiple reconnection cycles
94+
time.Sleep(500 * time.Millisecond)
95+
96+
if err := client.Close(); err != nil {
97+
t.Fatalf("failed to close client: %v", err)
98+
}
99+
100+
time.Sleep(100 * time.Millisecond)
101+
102+
if connectionCount < 2 {
103+
t.Logf("Warning: only %d connections made, expected multiple reconnections", connectionCount)
104+
}
105+
}
106+
107+
// TestWebSocketGoroutineLeaks_CloseWhileReading tests that closing the client
108+
// while a read is in progress doesn't leak goroutines.
109+
func TestWebSocketGoroutineLeaks_CloseWhileReading(t *testing.T) {
110+
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
111+
112+
upgrader := websocket.Upgrader{}
113+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
114+
conn, err := upgrader.Upgrade(w, r, nil)
115+
if err != nil {
116+
return
117+
}
118+
// Keep connection open but don't send data
119+
// This simulates a hanging read
120+
time.Sleep(5 * time.Second)
121+
conn.Close()
122+
}))
123+
defer server.Close()
124+
125+
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
126+
127+
client, err := NewClient(wsURL, nil, nil)
128+
if err != nil {
129+
t.Fatalf("failed to create client: %v", err)
130+
}
131+
132+
impl := client.(*clientImpl)
133+
impl.setReadTimeout(100 * time.Millisecond)
134+
135+
// Give readLoop time to start
136+
time.Sleep(50 * time.Millisecond)
137+
138+
// Close while read is potentially in progress
139+
if err := client.Close(); err != nil {
140+
t.Fatalf("failed to close client: %v", err)
141+
}
142+
143+
time.Sleep(100 * time.Millisecond)
144+
}
145+
146+
// TestWebSocketGoroutineLeaks_PingLoopCleanup tests that pingLoop goroutines
147+
// are properly cleaned up when connections are closed.
148+
func TestWebSocketGoroutineLeaks_PingLoopCleanup(t *testing.T) {
149+
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
150+
151+
upgrader := websocket.Upgrader{}
152+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
153+
conn, err := upgrader.Upgrade(w, r, nil)
154+
if err != nil {
155+
return
156+
}
157+
// Keep connection alive and respond to pings
158+
for {
159+
_, msg, err := conn.ReadMessage()
160+
if err != nil {
161+
return
162+
}
163+
if string(msg) == "PING" {
164+
conn.WriteMessage(websocket.TextMessage, []byte("PONG"))
165+
}
166+
}
167+
}))
168+
defer server.Close()
169+
170+
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
171+
172+
client, err := NewClient(wsURL, nil, nil)
173+
if err != nil {
174+
t.Fatalf("failed to create client: %v", err)
175+
}
176+
177+
impl := client.(*clientImpl)
178+
impl.heartbeatInterval = 20 * time.Millisecond
179+
impl.disablePing = false
180+
181+
// Wait for ping loop to start and send some pings
182+
time.Sleep(100 * time.Millisecond)
183+
184+
if err := client.Close(); err != nil {
185+
t.Fatalf("failed to close client: %v", err)
186+
}
187+
188+
time.Sleep(100 * time.Millisecond)
189+
}
190+
191+
// TestWebSocketGoroutineLeaks_SubscriptionCleanup tests that subscription
192+
// goroutines are properly cleaned up.
193+
func TestWebSocketGoroutineLeaks_SubscriptionCleanup(t *testing.T) {
194+
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
195+
196+
upgrader := websocket.Upgrader{}
197+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
198+
conn, err := upgrader.Upgrade(w, r, nil)
199+
if err != nil {
200+
return
201+
}
202+
defer conn.Close()
203+
204+
// Read subscription requests and keep connection alive
205+
for {
206+
_, _, err := conn.ReadMessage()
207+
if err != nil {
208+
return
209+
}
210+
}
211+
}))
212+
defer server.Close()
213+
214+
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
215+
216+
client, err := NewClient(wsURL, nil, nil)
217+
if err != nil {
218+
t.Fatalf("failed to create client: %v", err)
219+
}
220+
221+
// Create multiple subscriptions
222+
ctx := context.Background()
223+
stream1, err := client.SubscribeOrderbookStream(ctx, []string{"asset1"})
224+
if err != nil {
225+
t.Fatalf("failed to subscribe: %v", err)
226+
}
227+
228+
stream2, err := client.SubscribePricesStream(ctx, []string{"asset2"})
229+
if err != nil {
230+
t.Fatalf("failed to subscribe: %v", err)
231+
}
232+
233+
// Close streams
234+
stream1.Close()
235+
stream2.Close()
236+
237+
// Close client
238+
if err := client.Close(); err != nil {
239+
t.Fatalf("failed to close client: %v", err)
240+
}
241+
242+
time.Sleep(100 * time.Millisecond)
243+
}

0 commit comments

Comments
 (0)