Skip to content

Commit 1fd3997

Browse files
authored
Merge pull request #95 from binance/rc-common-v2.0.0
2 parents 86ae4c5 + c685905 commit 1fd3997

File tree

7 files changed

+387
-60
lines changed

7 files changed

+387
-60
lines changed

.github/workflows/release.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,12 @@ jobs:
172172
# Split into parts
173173
IFS='.' read -r -a VERSION_PARTS <<< "$CURRENT_VERSION"
174174
MAJOR=${VERSION_PARTS[0]}
175-
MINOR=${VERSION_PARTS[1]}
175+
MINOR=0
176176
PATCH=${VERSION_PARTS[2]}
177177
178178
# Increment patch version
179-
NEW_MINOR=$((MINOR + 1))
180-
NEW_VERSION="v${MAJOR}.${NEW_MINOR}.${PATCH}"
179+
NEW_MAJOR=$((MAJOR + 1))
180+
NEW_VERSION="v${NEW_MAJOR}.${MINOR}.${PATCH}"
181181
fi
182182
183183
# Set the full tag name

common/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
### Changelog
22

3+
## 2.0.0 - 2026-02-11
4+
5+
### Changed (2)
6+
7+
- Updated `WebSocketCommon` connect method to accept streams parameter for subscribing upon connection.
8+
- Updated `retry` logic in `utils.go` to use `SleepContext` for better handling of context cancellation during retries.
9+
310
## 1.2.0 - 2026-01-23
411

512
### Added (1)

common/common/configuration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ type ConfigurationWebsocketStreamsOption func(*ConfigurationWebsocketStreams)
111111
// @return A pointer to the newly created ConfigurationRestAPI.
112112
func NewConfigurationRestAPI(opts ...ConfigurationRestAPIOption) *ConfigurationRestAPI {
113113
basePath := "https://api.binance.com"
114-
timeout := 5 * time.Millisecond
114+
timeout := 5000 * time.Millisecond
115115
retries := 3
116116
backoff := 1000
117117
keepAlive := true

common/common/utils.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,38 @@ func ParseRateLimitHeaders(header http.Header) ([]RateLimit, error) {
408408
return rateLimits, nil
409409
}
410410

411+
// SleepContext pauses the execution for the specified duration or until the context is done.
412+
//
413+
// @param ctx The context to observe for cancellation.
414+
// @param duration The duration to sleep.
415+
// @return An error if the context is done before the duration elapses.
416+
func SleepContext(ctx context.Context, duration time.Duration) error {
417+
select {
418+
case <-ctx.Done():
419+
return ctx.Err()
420+
default:
421+
}
422+
423+
if duration <= 0 {
424+
select {
425+
case <-ctx.Done():
426+
return ctx.Err()
427+
default:
428+
return nil
429+
}
430+
}
431+
432+
timer := time.NewTimer(duration)
433+
defer timer.Stop()
434+
435+
select {
436+
case <-ctx.Done():
437+
return ctx.Err()
438+
case <-timer.C:
439+
return nil
440+
}
441+
}
442+
411443
// SendRequest sends an HTTP request and handles retries, response decoding, and error handling.
412444
//
413445
// @param ctx The context for the request.
@@ -439,7 +471,7 @@ func SendRequest[T any](ctx context.Context, path string, method string, queryPa
439471

440472
backoff := cfg.Backoff
441473
if backoff <= 0 {
442-
backoff = 1
474+
backoff = 1000
443475
}
444476
var lastErr error
445477

@@ -449,7 +481,9 @@ func SendRequest[T any](ctx context.Context, path string, method string, queryPa
449481
if err != nil {
450482
lastErr = err
451483
if attempt < retries && ShouldRetryRequest(err, method, retries-attempt, resp) {
452-
time.Sleep(time.Duration(backoff*attempt) * time.Second)
484+
if err := SleepContext(ctx, time.Duration(backoff*(attempt+1))*time.Millisecond); err != nil {
485+
return &RestApiResponse[T]{}, err
486+
}
453487
continue
454488
}
455489
return &RestApiResponse[T]{}, NewNetworkError(fmt.Sprintf("Network error: %v", err))
@@ -490,7 +524,9 @@ func SendRequest[T any](ctx context.Context, path string, method string, queryPa
490524

491525
if resp.StatusCode >= 500 && resp.StatusCode <= 504 {
492526
if attempt < retries {
493-
time.Sleep(time.Duration(backoff*attempt) * time.Second)
527+
if err := SleepContext(ctx, time.Duration(backoff*(attempt+1))*time.Millisecond); err != nil {
528+
return &RestApiResponse[T]{}, err
529+
}
494530
continue
495531
}
496532
return &RestApiResponse[T]{}, fmt.Errorf("request failed after %d retries: received status %d", retries, resp.StatusCode)
@@ -703,11 +739,11 @@ func SetupProxy(cfg *ConfigurationRestAPI) *http.Client {
703739
transport := BuildTransport(cfg.HTTPSAgent, cfg)
704740
return &http.Client{
705741
Transport: transport,
706-
Timeout: time.Duration(cfg.Timeout) * time.Millisecond,
742+
Timeout: cfg.Timeout,
707743
}
708744
}
709745
return &http.Client{
710-
Timeout: time.Duration(cfg.Timeout) * time.Millisecond,
746+
Timeout: cfg.Timeout,
711747
}
712748
}
713749

common/common/websocket.go

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -362,30 +362,44 @@ func (w *WebSocketCommon) initializePool() {
362362
//
363363
// @param config The WebSocketConfig containing configuration details.
364364
// @param userAgent The user agent string to be used for the connection.
365+
// @param streams A slice of stream names to subscribe to upon connection.
365366
// @return An error if the connection fails, otherwise nil.
366-
func (w *WebSocketCommon) Connect(config WebSocketConfig, userAgent string) error {
367+
func (w *WebSocketCommon) Connect(config WebSocketConfig, userAgent string, streams []string) error {
367368
if err := w.setupProxyDialer(config); err != nil {
368369
return fmt.Errorf("proxy setup failed: %v", err)
369370
}
370371

371-
BasePath := w.prepareBasePath(config)
372372
headers := w.prepareHeaders(config, userAgent)
373373
dialer := w.CreateWebSocketDialer(config)
374374

375375
if w.Mode == SINGLE {
376+
BasePath := w.prepareBasePath(config, streams, true)
376377
return w.connectSingleMode(BasePath, headers, dialer, config, userAgent)
377378
}
378-
return w.connectPoolMode(BasePath, headers, dialer, config, userAgent)
379+
return w.connectPoolMode(headers, dialer, config, userAgent, streams)
379380
}
380381

381382
// prepareBasePath constructs the base path for the WebSocket connection.
382383
//
383384
// @param config The WebSocketConfig containing configuration details.
385+
// @param streams The list of streams to include in the base path.
386+
// @param includeStreams A boolean indicating whether to include streams in the base path.
384387
// @return The constructed base path string.
385-
func (w *WebSocketCommon) prepareBasePath(config WebSocketConfig) string {
388+
func (w *WebSocketCommon) prepareBasePath(config WebSocketConfig, streams []string, includeStreams bool) string {
386389
BasePath := config.GetBasePath()
390+
if includeStreams && streams != nil && len(streams) > 0 {
391+
BasePath += "?streams="
392+
for _, stream := range streams {
393+
BasePath += stream + "/"
394+
}
395+
BasePath = strings.TrimSuffix(BasePath, "/")
396+
}
387397
if timeUnit := config.GetTimeUnit(); timeUnit != "" {
388-
BasePath = BasePath + "?timeUnit=" + string(timeUnit)
398+
if streams != nil && len(streams) > 0 {
399+
BasePath += "&timeUnit=" + string(timeUnit)
400+
} else {
401+
BasePath += "?timeUnit=" + string(timeUnit)
402+
}
389403
}
390404
return BasePath
391405
}
@@ -485,13 +499,13 @@ func (w *WebSocketCommon) connectSingleMode(BasePath string, headers http.Header
485499

486500
// connectPoolMode establishes WebSocket connections in pool mode.
487501
//
488-
// @param BasePath The base URL for the WebSocket connection.
489502
// @param headers The HTTP headers to include in the connection request.
490503
// @param dialer The WebSocket dialer to use for the connection.
491504
// @param config The WebSocketConfig containing configuration details.
492505
// @param userAgent The user agent string to use for the connection.
506+
// @param streams A slice of stream names to subscribe to upon connection.
493507
// @return An error if the connection fails, otherwise nil.
494-
func (w *WebSocketCommon) connectPoolMode(BasePath string, headers http.Header, dialer websocket.Dialer, config WebSocketConfig, userAgent string) error {
508+
func (w *WebSocketCommon) connectPoolMode(headers http.Header, dialer websocket.Dialer, config WebSocketConfig, userAgent string, streams []string) error {
495509
var wg sync.WaitGroup
496510
successChan := make(chan bool, len(w.Connections))
497511
errChan := make(chan error, len(w.Connections))
@@ -502,6 +516,7 @@ func (w *WebSocketCommon) connectPoolMode(BasePath string, headers http.Header,
502516
go func(num int, conn *WebSocketConnection) {
503517
defer wg.Done()
504518

519+
BasePath := w.prepareBasePath(config, streams, num == 0)
505520
wsConn, _, err := dialer.Dial(BasePath, headers)
506521
if err != nil {
507522
log.Printf("WebSocket connection error: %v", err)
@@ -601,7 +616,7 @@ func (w *WebSocketCommon) KeepAlive(connection *WebSocketConnection, config WebS
601616
// @param userAgent The user agent string to use for the connection.
602617
// @return An error if the reconnection fails, otherwise nil.
603618
func (w *WebSocketCommon) reconnect(conn *WebSocketConnection, config WebSocketConfig, userAgent string) error {
604-
BasePath := w.prepareBasePath(config)
619+
BasePath := w.prepareBasePath(config, conn.StreamConnectionMap, conn.SessionLogonRequest == nil)
605620
headers := w.prepareHeaders(config, userAgent)
606621
dialer := w.CreateWebSocketDialer(config)
607622

@@ -617,8 +632,6 @@ func (w *WebSocketCommon) reconnect(conn *WebSocketConnection, config WebSocketC
617632
w.restoreSessionIfNeeded(conn)
618633
time.Sleep(1 * time.Second)
619634
w.resubscribeUserDataStreams(conn)
620-
} else if len(conn.StreamConnectionMap) > 0 {
621-
w.resubscribeRegularStreams(conn)
622635
}
623636

624637
return nil
@@ -713,34 +726,6 @@ func (w *WebSocketCommon) resubscribeUserDataStreams(connection *WebSocketConnec
713726
}
714727
}
715728

716-
// resubscribeRegularStreams resubscribes to all regular streams after reconnection.
717-
//
718-
// @param connection The WebSocketConnection to resubscribe streams on.
719-
func (w *WebSocketCommon) resubscribeRegularStreams(connection *WebSocketConnection) {
720-
for _, stream := range connection.StreamConnectionMap {
721-
subscribePayload := map[string]interface{}{
722-
"method": "SUBSCRIBE",
723-
"params": []string{stream},
724-
"id": GenerateUUID(),
725-
}
726-
727-
message, err := json.Marshal(subscribePayload)
728-
if err != nil {
729-
log.Printf("Error during resubscription to stream %s: %v", stream, err)
730-
continue
731-
}
732-
733-
connection.mu.Lock()
734-
err = connection.Websocket.WriteMessage(websocket.TextMessage, message)
735-
connection.mu.Unlock()
736-
if err != nil {
737-
log.Printf("Error sending resubscription message for stream %s: %v", stream, err)
738-
continue
739-
}
740-
log.Printf("Resubscribed to stream %s on reconnection", stream)
741-
}
742-
}
743-
744729
// isConnectionReady checks if the WebSocket connection is open.
745730
//
746731
// @param connection The WebSocketConnection to check.
@@ -867,7 +852,7 @@ func NewWebsocketAPI(cfg *ConfigurationWebsocketApi) (*WebsocketAPI, error) {
867852
// @param userAgent The user agent string to be used for the connection.
868853
// @return An error if the connection fails, otherwise nil.
869854
func (w *WebsocketAPI) Connect(userAgent string) error {
870-
return w.WsCommon.Connect(w.Cfg, userAgent)
855+
return w.WsCommon.Connect(w.Cfg, userAgent, []string{})
871856
}
872857

873858
// SendMessage sends a message over the WebSocket connection and returns channels for the response and error.
@@ -1121,8 +1106,25 @@ func NewWebsocketStreams(cfg *ConfigurationWebsocketStreams) (*WebsocketStreams,
11211106
}, nil
11221107
}
11231108

1124-
func (w *WebsocketStreams) Connect(userAgent string) error {
1125-
return w.WsCommon.Connect(w.Cfg, userAgent)
1109+
// Connect establishes the WebSocket connection using the provided user agent and streams.
1110+
//
1111+
// @param userAgent The user agent string to be used for the connection.
1112+
// @param streams A slice of stream names to subscribe to upon connection.
1113+
// @return An error if the connection fails, otherwise nil.
1114+
func (w *WebsocketStreams) Connect(userAgent string, streams []string) error {
1115+
err := w.WsCommon.Connect(w.Cfg, userAgent, streams)
1116+
if err != nil {
1117+
fmt.Println("WebSocket connection error:", err)
1118+
return err
1119+
}
1120+
if streams != nil && len(streams) > 0 {
1121+
conn := w.WsCommon.Connections[0]
1122+
for _, stream := range streams {
1123+
w.GlobalStreamConnectionMap[stream] = append(w.GlobalStreamConnectionMap[stream], conn)
1124+
conn.StreamConnectionMap = append(conn.StreamConnectionMap, stream)
1125+
}
1126+
}
1127+
return nil
11261128
}
11271129

11281130
// Subscribe subscribes to the specified streams.

common/tests/unit/utils_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,92 @@ func TestSendRequest_ContentEncodingGzip(t *testing.T) {
513513
}
514514
}
515515

516+
func TestSleepContext_CompletesNormally(t *testing.T) {
517+
ctx := context.Background()
518+
duration := 50 * time.Millisecond
519+
520+
start := time.Now()
521+
err := common.SleepContext(ctx, duration)
522+
elapsed := time.Since(start)
523+
524+
if err != nil {
525+
t.Errorf("Expected no error, got %v", err)
526+
}
527+
528+
if elapsed < duration {
529+
t.Errorf("Expected sleep to last at least %v, but it lasted %v", duration, elapsed)
530+
}
531+
}
532+
533+
func TestSleepContext_CanceledContext(t *testing.T) {
534+
ctx, cancel := context.WithCancel(context.Background())
535+
cancel()
536+
537+
err := common.SleepContext(ctx, 1*time.Second)
538+
539+
if err != context.Canceled {
540+
t.Errorf("Expected context.Canceled error, got %v", err)
541+
}
542+
}
543+
544+
func TestSleepContext_CanceledDuringSleep(t *testing.T) {
545+
ctx, cancel := context.WithCancel(context.Background())
546+
547+
go func() {
548+
time.Sleep(50 * time.Millisecond)
549+
cancel()
550+
}()
551+
552+
start := time.Now()
553+
err := common.SleepContext(ctx, 1*time.Second)
554+
elapsed := time.Since(start)
555+
556+
if err != context.Canceled {
557+
t.Errorf("Expected context.Canceled error, got %v", err)
558+
}
559+
560+
if elapsed >= 1*time.Second {
561+
t.Errorf("Expected sleep to be interrupted before 1 second, but it lasted %v", elapsed)
562+
}
563+
}
564+
565+
func TestSleepContext_ZeroDuration(t *testing.T) {
566+
ctx := context.Background()
567+
568+
err := common.SleepContext(ctx, 0)
569+
570+
if err != nil {
571+
t.Errorf("Expected no error with zero duration, got %v", err)
572+
}
573+
}
574+
575+
func TestSleepContext_NegativeDuration(t *testing.T) {
576+
ctx := context.Background()
577+
578+
err := common.SleepContext(ctx, -1*time.Second)
579+
580+
if err != nil {
581+
t.Errorf("Expected no error with negative duration, got %v", err)
582+
}
583+
}
584+
585+
func TestSleepContext_TimeoutContext(t *testing.T) {
586+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
587+
defer cancel()
588+
589+
start := time.Now()
590+
err := common.SleepContext(ctx, 1*time.Second)
591+
elapsed := time.Since(start)
592+
593+
if err != context.DeadlineExceeded {
594+
t.Errorf("Expected context.DeadlineExceeded error, got %v", err)
595+
}
596+
597+
if elapsed >= 1*time.Second {
598+
t.Errorf("Expected sleep to be interrupted by timeout, but it lasted %v", elapsed)
599+
}
600+
}
601+
516602
func TestPrepareRequest_BasicHeadersAndQuery(t *testing.T) {
517603
cfg := &common.ConfigurationRestAPI{
518604
ApiKey: "apikey123",

0 commit comments

Comments
 (0)