Skip to content
Open
56 changes: 56 additions & 0 deletions v2/announcement_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package binance

import (
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"time"

"github.com/adshao/go-binance/v2/common"
)

// CreateAnnouncementParam creates a new WsAnnouncementParam for use with WsAnnouncementServe.
//
// Currently supports only WithRecvWindow option, which defaults to 6000 milliseconds
// if not specified.
func (c *Client) CreateAnnouncementParam(opts ...RequestOption) (WsAnnouncementParam, error) {
if c.APIKey == "" || c.SecretKey == "" {
return WsAnnouncementParam{}, errors.New("missing API key or secret key")
}
kt := c.KeyType
if kt == "" {
kt = common.KeyTypeHmac
}
req := new(request)
for _, opt := range opts {
opt(req)
}
if req.recvWindow == 0 {
req.recvWindow = 6000
}

sf, err := common.SignFunc(kt)
if err != nil {
return WsAnnouncementParam{}, err
}
r := make([]byte, 16)
rand.Read(r)
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error from rand.Read should be checked. Although rand.Read from crypto/rand is documented to never return an error in practice, it's a security best practice to handle potential errors when generating cryptographic random values.

Suggested change
rand.Read(r)
if _, err := rand.Read(r); err != nil {
return WsAnnouncementParam{}, fmt.Errorf("failed to generate random bytes: %w", err)
}

Copilot uses AI. Check for mistakes.

random := hex.EncodeToString(r)
timestamp := time.Now().UnixMilli()
recvWindow := req.recvWindow

param := WsAnnouncementParam{
Random: random,
Topic: "com_announcement_en",
RecvWindow: recvWindow,
Timestamp: timestamp,
ApiKey: c.APIKey,
}
signature, err := sf(c.SecretKey, fmt.Sprintf("random=%s&topic=%s&recvWindow=%d&timestamp=%d", param.Random, param.Topic, param.RecvWindow, param.Timestamp))
if err != nil {
return WsAnnouncementParam{}, err
}
param.Signature = *signature
return param, nil
}
80 changes: 67 additions & 13 deletions v2/websocket.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package binance

import (
"context"
"net/http"
"net/url"
"time"
Expand All @@ -17,17 +18,32 @@ type ErrHandler func(err error)
// WsConfig webservice configuration
type WsConfig struct {
Endpoint string
Header http.Header
Proxy *string
}

func newWsConfig(endpoint string) *WsConfig {
return &WsConfig{
Endpoint: endpoint,
Proxy: getWsProxyUrl(),
Header: make(http.Header),
}
}

var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
func wsServe(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
return wsServeWithConnHandler(cfg, handler, errHandler, func(ctx context.Context, c *websocket.Conn) {
if WebsocketKeepalive {
// This function overwrites the default ping frame handler
// sent by the websocket API server
keepAliveWithPong(ctx, c, WebsocketTimeout)
}
})
}

type ConnHandler func(context.Context, *websocket.Conn)

// WsServeWithConnHandler serves websocket with custom connection handler, useful for custom keepalive
var wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler, connHandler ConnHandler) (doneC, stopC chan struct{}, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the docs (https://developers.binance.com/docs/cms/general-info), looks like we need to send ping actively. wsServe seems to keep connection by replying ping from server. Does that work?

proxy := http.ProxyFromEnvironment
if cfg.Proxy != nil {
u, err := url.Parse(*cfg.Proxy)
Expand All @@ -42,7 +58,7 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
EnableCompression: true,
}

c, _, err := Dialer.Dial(cfg.Endpoint, nil)
c, _, err := Dialer.Dial(cfg.Endpoint, cfg.Header)
if err != nil {
return nil, nil, err
}
Expand All @@ -55,10 +71,12 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
// closed by the client.

defer close(doneC)
if WebsocketKeepalive {
// This function overwrites the default ping frame handler
// sent by the websocket API server
keepAlive(c, WebsocketTimeout)

// Custom connection handling, useful in active keepalive scenarios
if connHandler != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you wrap keepalive in another connHandler? The connHandler isn't used in other place. How do you think we keep in the old way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The websocket for the Binance announcement requires active ping messages. I want to put passive keepalive and active keepalive into one logic. The active keepalive is used in the WsAnouncementServe

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go connHandler(ctx, c)
}

// Wait for the stopC channel to be closed. We do that in a
Expand Down Expand Up @@ -87,8 +105,43 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
return
}

func keepAlive(c *websocket.Conn, timeout time.Duration) {
// keepAliveWithPing Keepalive by actively sending ping messages
func keepAliveWithPing(interval time.Duration, pongTimeout time.Duration) ConnHandler {
return func(ctx context.Context, c *websocket.Conn) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

lastResponse := time.Now()
c.SetPongHandler(func(appData string) error {
lastResponse = time.Now()
return nil
})

lastPongTicker := time.NewTicker(pongTimeout)
defer lastPongTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketPongTimeout)); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you think to add parameter WebsocketPingTimeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific? I don't get it.

Copy link
Contributor Author

@0x0001 0x0001 Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i get it. What do you think about combining ping/pong timeout into WebSocketTimeout?

return
}
case <-lastPongTicker.C:
if time.Since(lastResponse) > pongTimeout {
c.Close()
return
}
}
}
}
}

// keepAliveWithPong Keepalive by responding to ping messages
func keepAliveWithPong(ctx context.Context, c *websocket.Conn, timeout time.Duration) {
ticker := time.NewTicker(timeout)
defer ticker.Stop()

lastResponse := time.Now()

Expand All @@ -108,16 +161,17 @@ func keepAlive(c *websocket.Conn, timeout time.Duration) {
return nil
})

go func() {
defer ticker.Stop()
for {
<-ticker.C
for {
select {
case <-ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close the connection when context is cancelled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wsServeWithConnHandler code snippet is as follows

go func() {
		defer close(doneC)  // 1

		if connHandler != nil {
			ctx, cancel := context.WithCancel(context.Background())
			defer cancel()  // 2
			go connHandler(ctx, c)
		}

		silent := false
		go func() {
			select {
			case <-stopC:
				silent = true
			case <-doneC:
			}
			c.Close() // 3
		}()
		for {
			_, message, err := c.ReadMessage()
			if err != nil {
				if !silent {
					errHandler(err)
				}
				return
			}
			handler(message)
		}
	}()

// 2 go routine will only cause context cancel when it exits
// 1 If the go routine exits, it will execute close(doneC).
// 3 If doneC is closed, the websocket will be closed

return
case <-ticker.C:
if time.Since(lastResponse) > timeout {
c.Close()
return
}
}
}()
}
}

var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) {
Expand All @@ -136,7 +190,7 @@ var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) {
EnableCompression: false,
}

c, _, err := Dialer.Dial(cfg.Endpoint, nil)
c, _, err := Dialer.Dial(cfg.Endpoint, cfg.Header)
if err != nil {
return nil, err
}
Expand Down
94 changes: 86 additions & 8 deletions v2/websocket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package binance

import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -17,6 +18,7 @@ var (
BaseCombinedTestnetURL = "wss://stream.testnet.binance.vision/stream?streams="
BaseWsApiMainURL = "wss://ws-api.binance.com:443/ws-api/v3"
BaseWsApiTestnetURL = "wss://ws-api.testnet.binance.vision/ws-api/v3"
BaseWsAnnouncementURL = "wss://api.binance.com/sapi/wss"

// WebsocketTimeout is an interval for sending ping/pong messages if WebsocketKeepalive is enabled
WebsocketTimeout = time.Second * 600
Expand Down Expand Up @@ -135,20 +137,20 @@ func WsCombinedPartialDepthServe(symbolLevels map[string]string, handler WsParti
event.Symbol = strings.ToUpper(symbol)
data := j.Get("data").MustMap()
event.LastUpdateID, _ = data["lastUpdateId"].(json.Number).Int64()
bidsLen := len(data["bids"].([]interface{}))
bidsLen := len(data["bids"].([]any))
event.Bids = make([]Bid, bidsLen)
for i := 0; i < bidsLen; i++ {
item := data["bids"].([]interface{})[i].([]interface{})
item := data["bids"].([]any)[i].([]any)
event.Bids[i] = Bid{
Price: item[0].(string),
Quantity: item[1].(string),
}
}
asksLen := len(data["asks"].([]interface{}))
asksLen := len(data["asks"].([]any))
event.Asks = make([]Ask, asksLen)
for i := 0; i < asksLen; i++ {

item := data["asks"].([]interface{})[i].([]interface{})
item := data["asks"].([]any)[i].([]any)
event.Asks[i] = Ask{
Price: item[0].(string),
Quantity: item[1].(string),
Expand Down Expand Up @@ -258,20 +260,20 @@ func wsCombinedDepthServe(endpoint string, handler WsDepthHandler, errHandler Er
event.Time, _ = data["E"].(json.Number).Int64()
event.LastUpdateID, _ = data["u"].(json.Number).Int64()
event.FirstUpdateID, _ = data["U"].(json.Number).Int64()
bidsLen := len(data["b"].([]interface{}))
bidsLen := len(data["b"].([]any))
event.Bids = make([]Bid, bidsLen)
for i := 0; i < bidsLen; i++ {
item := data["b"].([]interface{})[i].([]interface{})
item := data["b"].([]any)[i].([]any)
event.Bids[i] = Bid{
Price: item[0].(string),
Quantity: item[1].(string),
}
}
asksLen := len(data["a"].([]interface{}))
asksLen := len(data["a"].([]any))
event.Asks = make([]Ask, asksLen)
for i := 0; i < asksLen; i++ {

item := data["a"].([]interface{})[i].([]interface{})
item := data["a"].([]any)[i].([]any)
event.Asks[i] = Ask{
Price: item[0].(string),
Quantity: item[1].(string),
Expand Down Expand Up @@ -909,6 +911,82 @@ func WsApiInitReadWriteConn() (*websocket.Conn, error) {
return conn, err
}

type WsAnnouncementEvent struct {
CatalogID int64 `json:"catalogId"`
CatalogName string `json:"catalogName"`
PublishDate int64 `json:"publishDate"`
Title string `json:"title"`
Body string `json:"body"`
Disclaimer string `json:"disclaimer"`
}

type WsAnnouncementParam struct {
Random string
Topic string
RecvWindow int64
Timestamp int64
Signature string
ApiKey string
}
type WsAnnouncementHandler func(event *WsAnnouncementEvent)

// WsAnnouncementServe establishes a WebSocket connection to listen for Binance announcements.
// See API documentation: https://developers.binance.com/docs/cms/announcement
//
// Parameters:
//
// params - Should be created using client.CreateAnnouncementParam
// handler - Callback function to handle incoming announcement messages
// errHandler - Error callback function for connection errors
//
// Returns:
//
// doneC - Channel that closes when the connection terminates
// stopC - Channel that can be closed to stop the connection
// err - Any initial connection error
func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
if UseTestnet {
return nil, nil, errors.New("testnet is not supported")
}
endpoint := fmt.Sprintf("%s?random=%s&topic=%s&recvWindow=%d&timestamp=%d&signature=%s",
BaseWsAnnouncementURL, params.Random, params.Topic, params.RecvWindow, params.Timestamp, params.Signature,
)

cfg := newWsConfig(endpoint)
cfg.Header.Add("X-MBX-APIKEY", params.ApiKey)
wsHandler := func(message []byte) {
event := struct {
Type string `json:"type"`
Topic string `json:"topic"`
Data string `json:"data"`
}{}

err := json.Unmarshal(message, &event)
if err != nil {
errHandler(err)
return
}

if event.Type != "DATA" {
errHandler(errors.New("type is not DATA: " + event.Type))
return
}

if event.Topic != "com_announcement_en" {
errHandler(errors.New("topic is not com_announcement_en: " + event.Topic))
return
}
Comment on lines +977 to +980
Copy link

Copilot AI Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coded topic string should be extracted as a constant to avoid duplication and improve maintainability. The same string 'com_announcement_en' is used in announcement_service.go line 45.

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sc0Vu This only repeats twice, and probably won't repeat more than that. I don't think it's necessary to define it as a constant. What's your take?


e := new(WsAnnouncementEvent)
if err := json.Unmarshal([]byte(event.Data), &e); err != nil {
errHandler(err)
return
}
handler(e)
}
return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveWithPing(30*time.Second, WebsocketTimeout))
Comment on lines +987 to +989
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sc0Vu here, send ping actively, in the last arguments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant wsServe seems not used here. Should we remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wsServe is used in other websocket services, do we need to change everything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, you're right.

}

// getWsApiEndpoint return the base endpoint of the API WS according the UseTestnet flag
func getWsApiEndpoint() string {
if UseTestnet {
Expand Down
Loading