-
Notifications
You must be signed in to change notification settings - Fork 758
Add Binance announcement WebSocket support #750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
d9bf299
b70b844
395d12e
e0409a0
692bacd
e43820e
ce42051
b234b72
86649a8
f2bd8a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package binance | ||
|
||
import ( | ||
"crypto/rand" | ||
"encoding/hex" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/adshao/go-binance/v2/common" | ||
) | ||
|
||
// CreateAnnouncementParam creates a new WsAnnouncementParam for use with WsAnnouncementServe. | ||
// | ||
// Currently supports only WithRecvWindow option, which defaults to 6000 milliseconds | ||
// if not specified. | ||
func (c *Client) CreateAnnouncementParam(opts ...RequestOption) (WsAnnouncementParam, error) { | ||
if c.APIKey == "" || c.SecretKey == "" { | ||
return WsAnnouncementParam{}, errors.New("miss apikey or secret key") | ||
0x0001 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
kt := c.KeyType | ||
if kt == "" { | ||
kt = common.KeyTypeHmac | ||
} | ||
req := new(request) | ||
for _, opt := range opts { | ||
opt(req) | ||
} | ||
if req.recvWindow == 0 { | ||
req.recvWindow = 6000 | ||
} | ||
|
||
sf, err := common.SignFunc(kt) | ||
if err != nil { | ||
return WsAnnouncementParam{}, err | ||
} | ||
r := make([]byte, 16) | ||
rand.Read(r) | ||
random := hex.EncodeToString(r) | ||
timestamp := time.Now().UnixMilli() | ||
recvWindow := req.recvWindow | ||
|
||
param := WsAnnouncementParam{ | ||
Random: random, | ||
Topic: "com_announcement_en", | ||
RecvWindow: recvWindow, | ||
Timestamp: timestamp, | ||
ApiKey: c.APIKey, | ||
} | ||
signature, err := sf(c.SecretKey, fmt.Sprintf("random=%s&topic=%s&recvWindow=%d×tamp=%d", param.Random, param.Topic, param.RecvWindow, param.Timestamp)) | ||
if err != nil { | ||
return WsAnnouncementParam{}, err | ||
} | ||
param.Signature = *signature | ||
return param, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package binance | ||
|
||
import ( | ||
"context" | ||
"net/http" | ||
"net/url" | ||
"time" | ||
|
@@ -17,17 +18,32 @@ type ErrHandler func(err error) | |
// WsConfig webservice configuration | ||
type WsConfig struct { | ||
Endpoint string | ||
Header http.Header | ||
Proxy *string | ||
} | ||
|
||
func newWsConfig(endpoint string) *WsConfig { | ||
return &WsConfig{ | ||
Endpoint: endpoint, | ||
Proxy: getWsProxyUrl(), | ||
Header: make(http.Header), | ||
} | ||
} | ||
|
||
var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { | ||
func wsServe(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { | ||
return wsServeWithConnHandler(cfg, handler, errHandler, func(ctx context.Context, c *websocket.Conn) { | ||
if WebsocketKeepalive { | ||
// This function overwrites the default ping frame handler | ||
// sent by the websocket API server | ||
keepAliveWithPong(ctx, c, WebsocketTimeout) | ||
} | ||
}) | ||
} | ||
|
||
type ConnHandler func(context.Context, *websocket.Conn) | ||
|
||
// WsServeWithConnHandler serves websocket with custom connection handler, useful for custom keepalive | ||
var wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler, connHandler ConnHandler) (doneC, stopC chan struct{}, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the docs (https://developers.binance.com/docs/cms/general-info), looks like we need to send ping actively. |
||
proxy := http.ProxyFromEnvironment | ||
if cfg.Proxy != nil { | ||
u, err := url.Parse(*cfg.Proxy) | ||
|
@@ -42,7 +58,7 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don | |
EnableCompression: true, | ||
} | ||
|
||
c, _, err := Dialer.Dial(cfg.Endpoint, nil) | ||
c, _, err := Dialer.Dial(cfg.Endpoint, cfg.Header) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
@@ -55,10 +71,12 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don | |
// closed by the client. | ||
|
||
defer close(doneC) | ||
if WebsocketKeepalive { | ||
// This function overwrites the default ping frame handler | ||
// sent by the websocket API server | ||
keepAlive(c, WebsocketTimeout) | ||
|
||
// Custom connection handling, useful in active keepalive scenarios | ||
if connHandler != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you wrap keepalive in another connHandler? The connHandler isn't used in other place. How do you think we keep in the old way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The websocket for the Binance announcement requires active ping messages. I want to put passive keepalive and active keepalive into one logic. The active keepalive is used in the |
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
go connHandler(ctx, c) | ||
} | ||
|
||
// Wait for the stopC channel to be closed. We do that in a | ||
|
@@ -87,8 +105,43 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don | |
return | ||
} | ||
|
||
func keepAlive(c *websocket.Conn, timeout time.Duration) { | ||
// keepAliveWithPing Keepalive by actively sending ping messages | ||
func keepAliveWithPing(interval time.Duration, pongTimeout time.Duration) ConnHandler { | ||
return func(ctx context.Context, c *websocket.Conn) { | ||
ticker := time.NewTicker(interval) | ||
defer ticker.Stop() | ||
|
||
lastResponse := time.Now() | ||
c.SetPongHandler(func(appData string) error { | ||
lastResponse = time.Now() | ||
return nil | ||
}) | ||
|
||
lastPongTicker := time.NewTicker(pongTimeout) | ||
defer lastPongTicker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketPongTimeout)); err != nil { | ||
|
||
return | ||
} | ||
case <-lastPongTicker.C: | ||
if time.Since(lastResponse) > pongTimeout { | ||
c.Close() | ||
return | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// keepAliveWithPong Keepalive by responding to ping messages | ||
func keepAliveWithPong(ctx context.Context, c *websocket.Conn, timeout time.Duration) { | ||
ticker := time.NewTicker(timeout) | ||
defer ticker.Stop() | ||
|
||
lastResponse := time.Now() | ||
|
||
|
@@ -108,16 +161,17 @@ func keepAlive(c *websocket.Conn, timeout time.Duration) { | |
return nil | ||
}) | ||
|
||
go func() { | ||
defer ticker.Stop() | ||
for { | ||
<-ticker.C | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we close the connection when context is cancelled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The go func() {
defer close(doneC) // 1
if connHandler != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 2
go connHandler(ctx, c)
}
silent := false
go func() {
select {
case <-stopC:
silent = true
case <-doneC:
}
c.Close() // 3
}()
for {
_, message, err := c.ReadMessage()
if err != nil {
if !silent {
errHandler(err)
}
return
}
handler(message)
}
}() // 2 go routine will only cause context cancel when it exits |
||
return | ||
case <-ticker.C: | ||
if time.Since(lastResponse) > timeout { | ||
c.Close() | ||
return | ||
} | ||
} | ||
}() | ||
} | ||
} | ||
|
||
var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) { | ||
|
@@ -136,7 +190,7 @@ var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) { | |
EnableCompression: false, | ||
} | ||
|
||
c, _, err := Dialer.Dial(cfg.Endpoint, nil) | ||
c, _, err := Dialer.Dial(cfg.Endpoint, cfg.Header) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package binance | |
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
@@ -17,6 +18,7 @@ var ( | |
BaseCombinedTestnetURL = "wss://stream.testnet.binance.vision/stream?streams=" | ||
BaseWsApiMainURL = "wss://ws-api.binance.com:443/ws-api/v3" | ||
BaseWsApiTestnetURL = "wss://ws-api.testnet.binance.vision/ws-api/v3" | ||
BaseWsAnnouncementURL = "wss://api.binance.com/sapi/wss" | ||
|
||
// WebsocketTimeout is an interval for sending ping/pong messages if WebsocketKeepalive is enabled | ||
WebsocketTimeout = time.Second * 600 | ||
|
@@ -135,20 +137,20 @@ func WsCombinedPartialDepthServe(symbolLevels map[string]string, handler WsParti | |
event.Symbol = strings.ToUpper(symbol) | ||
data := j.Get("data").MustMap() | ||
event.LastUpdateID, _ = data["lastUpdateId"].(json.Number).Int64() | ||
bidsLen := len(data["bids"].([]interface{})) | ||
bidsLen := len(data["bids"].([]any)) | ||
event.Bids = make([]Bid, bidsLen) | ||
for i := 0; i < bidsLen; i++ { | ||
item := data["bids"].([]interface{})[i].([]interface{}) | ||
item := data["bids"].([]any)[i].([]any) | ||
event.Bids[i] = Bid{ | ||
Price: item[0].(string), | ||
Quantity: item[1].(string), | ||
} | ||
} | ||
asksLen := len(data["asks"].([]interface{})) | ||
asksLen := len(data["asks"].([]any)) | ||
event.Asks = make([]Ask, asksLen) | ||
for i := 0; i < asksLen; i++ { | ||
|
||
item := data["asks"].([]interface{})[i].([]interface{}) | ||
item := data["asks"].([]any)[i].([]any) | ||
event.Asks[i] = Ask{ | ||
Price: item[0].(string), | ||
Quantity: item[1].(string), | ||
|
@@ -258,20 +260,20 @@ func wsCombinedDepthServe(endpoint string, handler WsDepthHandler, errHandler Er | |
event.Time, _ = data["E"].(json.Number).Int64() | ||
event.LastUpdateID, _ = data["u"].(json.Number).Int64() | ||
event.FirstUpdateID, _ = data["U"].(json.Number).Int64() | ||
bidsLen := len(data["b"].([]interface{})) | ||
bidsLen := len(data["b"].([]any)) | ||
event.Bids = make([]Bid, bidsLen) | ||
for i := 0; i < bidsLen; i++ { | ||
item := data["b"].([]interface{})[i].([]interface{}) | ||
item := data["b"].([]any)[i].([]any) | ||
event.Bids[i] = Bid{ | ||
Price: item[0].(string), | ||
Quantity: item[1].(string), | ||
} | ||
} | ||
asksLen := len(data["a"].([]interface{})) | ||
asksLen := len(data["a"].([]any)) | ||
event.Asks = make([]Ask, asksLen) | ||
for i := 0; i < asksLen; i++ { | ||
|
||
item := data["a"].([]interface{})[i].([]interface{}) | ||
item := data["a"].([]any)[i].([]any) | ||
event.Asks[i] = Ask{ | ||
Price: item[0].(string), | ||
Quantity: item[1].(string), | ||
|
@@ -872,6 +874,79 @@ func WsApiInitReadWriteConn() (*websocket.Conn, error) { | |
return conn, err | ||
} | ||
|
||
type WsAnnouncementEvent struct { | ||
CatalogID int64 `json:"catalogId"` | ||
CatalogName string `json:"catalogName"` | ||
PublishDate int64 `json:"publishDate"` | ||
Title string `json:"title"` | ||
Body string `json:"body"` | ||
Disclaimer string `json:"disclaimer"` | ||
} | ||
|
||
type WsAnnouncementParam struct { | ||
Random string | ||
Topic string | ||
RecvWindow int64 | ||
Timestamp int64 | ||
Signature string | ||
ApiKey string | ||
} | ||
type WsAnnouncementHandler func(event *WsAnnouncementEvent) | ||
|
||
// WsAnnouncementServe establishes a WebSocket connection to listen for Binance announcements. | ||
// See API documentation: https://developers.binance.com/docs/cms/announcement | ||
// | ||
// Parameters: | ||
// | ||
// params - Should be created using client.CreateAnnouncementParam | ||
// handler - Callback function to handle incoming announcement messages | ||
// errHandler - Error callback function for connection errors | ||
// | ||
// Returns: | ||
// | ||
// doneC - Channel that closes when the connection terminates | ||
// stopC - Channel that can be closed to stop the connection | ||
// err - Any initial connection error | ||
func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { | ||
if UseTestnet { | ||
return nil, nil, errors.New("not support testnet") | ||
0x0001 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
endpoint := fmt.Sprintf("%s?random=%s&topic=%s&recvWindow=%d×tamp=%d&signature=%s", | ||
BaseWsAnnouncementURL, params.Random, params.Topic, params.RecvWindow, params.Timestamp, params.Signature, | ||
) | ||
|
||
cfg := newWsConfig(endpoint) | ||
cfg.Header.Add("X-MBX-APIKEY", params.ApiKey) | ||
wsHandler := func(message []byte) { | ||
event := struct { | ||
Type string `json:"type"` | ||
Topic string `json:"topic"` | ||
Data string `json:"data"` | ||
}{} | ||
|
||
err := json.Unmarshal(message, &event) | ||
if err != nil { | ||
errHandler(err) | ||
return | ||
} | ||
|
||
if event.Type != "DATA" { | ||
errHandler(errors.New("type is not DATA: " + event.Type)) | ||
return | ||
} | ||
|
||
if event.Topic != "com_announcement_en" { | ||
errHandler(errors.New("topic is not com_announcement_en: " + event.Topic)) | ||
return | ||
} | ||
Comment on lines
+977
to
+980
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hard-coded topic string should be extracted as a constant to avoid duplication and improve maintainability. The same string 'com_announcement_en' is used in announcement_service.go line 45. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sc0Vu This only repeats twice, and probably won't repeat more than that. I don't think it's necessary to define it as a constant. What's your take? |
||
|
||
e := new(WsAnnouncementEvent) | ||
json.Unmarshal([]byte(event.Data), &e) | ||
0x0001 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
handler(e) | ||
} | ||
return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveWithPing(30*time.Second, WebsocketTimeout)) | ||
Comment on lines
+987
to
+989
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sc0Vu here, send ping actively, in the last arguments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, you're right. |
||
} | ||
|
||
// getWsApiEndpoint return the base endpoint of the API WS according the UseTestnet flag | ||
func getWsApiEndpoint() string { | ||
if UseTestnet { | ||
|
Uh oh!
There was an error while loading. Please reload this page.