diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index e44a68c59..e809863f4 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -1,8 +1,12 @@ package plugins import ( + "context" + "encoding/json" + "errors" "fmt" "log" + "sort" "strconv" "strings" "sync" @@ -12,12 +16,19 @@ import ( "github.com/adshao/go-binance/v2/common" "github.com/stellar/kelp/api" "github.com/stellar/kelp/model" + "github.com/stellar/kelp/support/sdk" ) const ( STREAM_TICKER_FMT = "%s@ticker" STREAM_BOOK_FMT = "%s@depth" - TTLTIME = time.Second * 3 // ttl time in seconds + + //not from binance docs, just for convetion we use @streamName + STREAM_USER = "@user" + // key used to save last cursor in events, it must be something that couldn't be used in the map + LAST_CURSOR_KEY = STREAM_USER + "||lastCursor" + TTLTIME = time.Second * 3 // ttl time in seconds + EVENT_EXECUTION_REPORT = "executionReport" ) var ( @@ -27,8 +38,12 @@ var ( var ( ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"} ErrConversionWsPartialDepthEvent = errConversion{from: "interface", to: "*binance.WsPartialDepthEvent"} + ErrConversionHistory = errConversion{from: "interface", to: "History"} + ErrConversionCursor = errConversion{from: "interface", to: "int64"} ) +type History []*sdk.EventExecutionReport + type Subscriber func(symbol string, state *mapEvents) (*stream, error) type errMissingSymbol struct { symbol string @@ -79,6 +94,7 @@ func (s stream) Close() { //mapData... struct used to data from events and timestamp when they are cached type mapData struct { data interface{} + err error createdAt time.Time } @@ -95,7 +111,7 @@ type mapEvents struct { } //Set ... set value -func (m *mapEvents) Set(key string, data interface{}) { +func (m *mapEvents) Set(key string, data interface{}, err error) { now := time.Now() @@ -105,6 +121,7 @@ func (m *mapEvents) Set(key string, data interface{}) { m.data[key] = mapData{ data: data, createdAt: now, + err: err, } } @@ -138,14 +155,16 @@ func makeMapEvents() *mapEvents { //struct used to keep all cached data type events struct { - SymbolStats *mapEvents - BookStats *mapEvents + SymbolStats *mapEvents + BookStats *mapEvents + TradeHistoryEvents *mapEvents } func createStateEvents() *events { events := &events{ - SymbolStats: makeMapEvents(), - BookStats: makeMapEvents(), + SymbolStats: makeMapEvents(), + BookStats: makeMapEvents(), + TradeHistoryEvents: makeMapEvents(), } return events @@ -157,7 +176,7 @@ func createStateEvents() *events { func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { wsMarketStatHandler := func(ticker *binance.WsMarketStatEvent) { - state.Set(symbol, ticker) + state.Set(symbol, ticker, nil) } errHandler := func(err error) { @@ -180,6 +199,98 @@ func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { } +func subcribeUserStream(listenKey string, state *mapEvents) (*stream, error) { + + userStreamLock := sync.Mutex{} + + wsUserStreamExecutinReportHandler := func(message []byte) { + + event := &sdk.EventExecutionReport{} + err := json.Unmarshal(message, event) + + if err != nil { + log.Printf("Error unmarshal %s to eventExecutionReport\n", string(message)) + return + } + + userStreamLock.Lock() + defer userStreamLock.Unlock() + + history, isHistory := state.Get(event.Symbol) + + if !isHistory { + history.data = make(History, 0) + state.Set(event.Symbol, history, nil) + } + + now := time.Now() + history.createdAt = now + + data, isOk := history.data.(History) + + if !isOk { + log.Printf("Error conversion %v\n", ErrConversionHistory) + state.Set(event.Symbol, history, ErrConversionHistory) + return + } + + history.data = append(data, event) + + lastCursor := event.TransactionTime + + lastCursorData, isCursor := state.Get(LAST_CURSOR_KEY) + + if isCursor { + + cursor, isOk := lastCursorData.data.(int64) + + if isOk { + if cursor > lastCursor { + lastCursor = cursor + } + } else { + log.Printf("Error converting cursor %v\n", ErrConversionCursor) + err = ErrConversionCursor + } + } + + state.Set(LAST_CURSOR_KEY, lastCursor, err) + } + + wsUserStreamHandler := func(message []byte) { + event := &sdk.EventBinance{} + err := json.Unmarshal(message, event) + + if err != nil { + log.Printf("Error unmarshal %s to eventBinance\n", string(message)) + return + } + + switch event.Name { + case EVENT_EXECUTION_REPORT: + wsUserStreamExecutinReportHandler(message) + } + + } + + errHandler := func(err error) { + log.Printf("Error WsUserDataServe for listenKey %s: %v\n", listenKey, err) + } + + doneC, stopC, err := binance.WsUserDataServe(listenKey, wsUserStreamHandler, errHandler) + + if err != nil { + return nil, fmt.Errorf("error creating WsUserDataService:%s", err) + } + + keepConnection(doneC, func() { + subcribeUserStream(listenKey, state) + }) + + return &stream{doneC: doneC, stopC: stopC}, err + +} + //restart Connection with ws // Binance close each connection after 24 hours func keepConnection(doneC chan struct{}, reconnect func()) { @@ -196,7 +307,7 @@ func keepConnection(doneC chan struct{}, reconnect func()) { func subcribeBook(symbol string, state *mapEvents) (*stream, error) { wsPartialDepthHandler := func(event *binance.WsPartialDepthEvent) { - state.Set(symbol, event) + state.Set(symbol, event, nil) } errHandler := func(err error) { @@ -228,26 +339,93 @@ type binanceExchangeWs struct { assetConverter model.AssetConverterInterface delimiter string + + client *binance.Client + listenKey string + + keys api.ExchangeAPIKey + + errUserStream error } // makeBinanceWs is a factory method to make an binance exchange over ws -func makeBinanceWs() (*binanceExchangeWs, error) { +func makeBinanceWs(keys api.ExchangeAPIKey) (*binanceExchangeWs, error) { binance.WebsocketKeepalive = true events := createStateEvents() + streams := make(map[string]*stream) + beWs := &binanceExchangeWs{ events: events, delimiter: "", assetConverter: model.CcxtAssetConverter, streamLock: &sync.Mutex{}, - streams: make(map[string]*stream), + streams: streams, + keys: keys, } return beWs, nil } +//ListenKey expires every 60 minutes +func (beWs *binanceExchangeWs) keepAliveStreamService(client *binance.Client, key string) { + + for { + time.Sleep(time.Minute * 50) + err := client.NewKeepaliveUserStreamService().ListenKey(key).Do(context.Background()) + + if err != nil { + log.Printf("Error keepAliveStreamService %v\n", err) + } + + beWs.errUserStream = err + } +} + +func (beWs *binanceExchangeWs) isSubscribedUserStream() bool { + + _, isStream := beWs.streams[STREAM_USER] + + return isStream +} + +func (beWs *binanceExchangeWs) subscribeUserStream() error { + + beWs.streamLock.Lock() + defer beWs.streamLock.Unlock() + + if beWs.isSubscribedUserStream() { + return nil + } + + binanceClient := binance.NewClient(beWs.keys.Key, beWs.keys.Secret) + + listenKey, err := binanceClient.NewStartUserStreamService().Do(context.Background()) + + if err != nil { + return fmt.Errorf("error when creating listenKey: %s", err) + } + + go beWs.keepAliveStreamService(binanceClient, listenKey) + + streamUser, err := subcribeUserStream(listenKey, beWs.events.TradeHistoryEvents) + + if err != nil { + return fmt.Errorf("error when subscribing to user stream: %s", err) + } + + beWs.streams[STREAM_USER] = streamUser + beWs.client = binanceClient + beWs.listenKey = listenKey + + //Wait for first + time.Sleep(timeWaitForFirstEvent) + + return nil +} + //getPrceision... get precision for float string func getPrecision(floatStr string) int8 { @@ -284,7 +462,7 @@ func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe //We couldn't subscribe for this pair if !isStream { - return mapData{}, fmt.Errorf("error while subscribing for %s", streamName) + return mapData{}, fmt.Errorf("error data doesn't exist for %s", streamName) } return data, nil @@ -308,7 +486,7 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo tickerData, err = beWs.subscribeStream(symbol, STREAM_TICKER_FMT, subcribeTicker, beWs.events.SymbolStats) if err != nil { - return nil, err + return nil, fmt.Errorf("error when subscribing to stream %s:%s", fmt.Sprintf(STREAM_TICKER_FMT, symbol), err) } } @@ -380,6 +558,10 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in } + if bookData.err != nil { + return nil, fmt.Errorf("error from stream:%v", bookData.err) + } + //Show how old is the orderbook log.Printf("OrderBook for %s is %d milliseconds old!\n", symbol, time.Now().Sub(bookData.createdAt).Milliseconds()) @@ -411,13 +593,13 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in asks, err := beWs.readOrders(askCcxtOrders, pair, model.OrderActionSell) if err != nil { - return nil, err + return nil, fmt.Errorf("error when reading ask orders:%s", err) } bids, err := beWs.readOrders(bidCcxtOrders, pair, model.OrderActionBuy) if err != nil { - return nil, err + return nil, fmt.Errorf("error when reading bid orders:%s", err) } return model.MakeOrderBook(pair, asks, bids), nil @@ -450,6 +632,171 @@ func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *mode return result, nil } +// GetTradeHistory impl +func (beWs *binanceExchangeWs) GetTradeHistory(pair model.TradingPair, maybeCursorStart interface{}, maybeCursorEnd interface{}) (*api.TradeHistoryResult, error) { + + if beWs.errUserStream != nil { + return nil, fmt.Errorf("error from update listen key:%v", beWs.errUserStream) + } + + if !beWs.isSubscribedUserStream() { + if err := beWs.subscribeUserStream(); err != nil { + return nil, fmt.Errorf("error subscribing to user stream: %s", err) + } + } + + symbol, err := pair.ToString(beWs.assetConverter, beWs.delimiter) + if err != nil { + return nil, fmt.Errorf("error converting symbol to string: %s", err) + } + + data, isOrders := beWs.events.TradeHistoryEvents.Get(symbol) + + if !isOrders { + return nil, fmt.Errorf("no trade history for trading pair '%s'", symbol) + } + + if data.err != nil { + return nil, fmt.Errorf("error from stream:%v", data.err) + } + + history, isOk := data.data.(History) + + if !isOk { + return nil, ErrConversionHistory + } + + trades := []model.Trade{} + for _, raw := range history { + + t, err := beWs.readTrade(&pair, symbol, raw) + if err != nil { + return nil, fmt.Errorf("error while reading trade: %s", err) + } + + t.OrderID = fmt.Sprintf("%d", raw.OrderID) + + trades = append(trades, *t) + } + + sort.Sort(model.TradesByTsID(trades)) + cursor := maybeCursorStart + + if len(trades) > 0 { + cursor, err = beWs.getCursor(trades) + if err != nil { + return nil, fmt.Errorf("error getting cursor when fetching trades: %s", err) + } + } + + return &api.TradeHistoryResult{ + Cursor: cursor, + Trades: trades, + }, nil +} + +func (beWs *binanceExchangeWs) getCursor(trades []model.Trade) (interface{}, error) { + lastTrade := trades[len(trades)-1] + + lastCursor := lastTrade.Order.Timestamp.AsInt64() + // add 1 to lastCursor so we don't repeat the same cursor on the next run + fetchedCursor := strconv.FormatInt(lastCursor+1, 10) + + // update cursor accordingly + return fetchedCursor, nil +} + +// GetLatestTradeCursor impl. +func (beWs *binanceExchangeWs) GetLatestTradeCursor() (interface{}, error) { + + if beWs.errUserStream != nil { + return nil, fmt.Errorf("error from update listen key:%v", beWs.errUserStream) + } + + if !beWs.isSubscribedUserStream() { + if err := beWs.subscribeUserStream(); err != nil { + return nil, fmt.Errorf("error subscribing to user stream: %s", err) + } + } + + lastTradeCursor, isCursor := beWs.events.TradeHistoryEvents.Get(LAST_CURSOR_KEY) + + if !isCursor { + return nil, errors.New("Missing cursor") + } + + if lastTradeCursor.err != nil { + return nil, fmt.Errorf("error from stream:%v", lastTradeCursor.err) + } + + cursor, isOk := lastTradeCursor.data.(int64) + + if !isOk { + return nil, ErrConversionCursor + } + + return fmt.Sprintf("%d", cursor), nil +} + +func (beWs *binanceExchangeWs) readTrade(pair *model.TradingPair, symbol string, rawTrade *sdk.EventExecutionReport) (*model.Trade, error) { + if rawTrade.Symbol != symbol { + return nil, fmt.Errorf("expected '%s' for 'symbol' field, got: %s", symbol, rawTrade.Symbol) + } + + pricePrecision := getPrecision(rawTrade.OrderPrice) + volumePrecision := getPrecision(rawTrade.OrderQuantity) + // use bigger precision for fee and cost since they are logically derived from amount and price + feecCostPrecision := pricePrecision + if volumePrecision > pricePrecision { + feecCostPrecision = volumePrecision + } + + orderPrice, err := strconv.ParseFloat(rawTrade.OrderPrice, 64) + if err != nil { + return nil, fmt.Errorf("error converting OrderPrice:%v", err) + } + + orderQuantity, err := strconv.ParseFloat(rawTrade.OrderQuantity, 64) + if err != nil { + return nil, fmt.Errorf("error converting OrderQuantity:%v", err) + } + + comissionAmount, err := strconv.ParseFloat(rawTrade.ComissionAmount, 64) + if err != nil { + return nil, fmt.Errorf("error converting ComissionAmount:%v", err) + } + + trade := model.Trade{ + Order: model.Order{ + Pair: pair, + Price: model.NumberFromFloat(orderPrice, pricePrecision), + Volume: model.NumberFromFloat(orderQuantity, volumePrecision), + OrderType: model.OrderTypeLimit, + Timestamp: model.MakeTimestamp(rawTrade.TransactionTime), + }, + TransactionID: model.MakeTransactionID(strconv.FormatInt(rawTrade.OrderID, 10)), + Cost: model.NumberFromFloat(comissionAmount, feecCostPrecision), + // OrderID read by calling function depending on override set for exchange params in "orderId" field of Info object + } + + if rawTrade.Side == "sell" { + trade.OrderAction = model.OrderActionSell + } else if rawTrade.Side == "buy" { + trade.OrderAction = model.OrderActionBuy + } else { + return nil, fmt.Errorf("unrecognized value for 'side' field: %s (rawTrade = %+v)", rawTrade.Side, rawTrade) + } + + if trade.Cost.AsFloat() < 0 { + return nil, fmt.Errorf("trade.Cost was < 0 (%f)", trade.Cost.AsFloat()) + } + if trade.Order.Volume.AsFloat() < 0 { + return nil, fmt.Errorf("trade.Order.Volume was < 0 (%f)", trade.Order.Volume.AsFloat()) + } + + return &trade, nil +} + //Unsubscribe ... unsubscribe from binance streams func (beWs *binanceExchangeWs) Unsubscribe(stream string) { diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index 4f6133468..5d705e889 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -1,13 +1,19 @@ package plugins import ( + "errors" "fmt" + "strconv" "testing" + "time" + "github.com/stellar/kelp/api" "github.com/stellar/kelp/model" "github.com/stretchr/testify/assert" ) +var emptyAPIKeyBinance = api.ExchangeAPIKey{} + func Test_createStateEvents(t *testing.T) { events := createStateEvents() @@ -16,10 +22,11 @@ func Test_createStateEvents(t *testing.T) { } func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { + pair := model.TradingPair{Base: model.XLM, Quote: model.BTC} pairs := []model.TradingPair{pair} - testBinanceExchangeWs, err := makeBinanceWs() + testBinanceExchangeWs, err := makeBinanceWs(emptyAPIKeyBinance) if !assert.NoError(t, err) { return @@ -51,7 +58,7 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { - testBinanceExchangeWs, e := makeBinanceWs() + testBinanceExchangeWs, e := makeBinanceWs(emptyAPIKeyBinance) if !assert.NoError(t, e) { return } @@ -89,3 +96,94 @@ func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { assert.True(t, ob.Bids()[0].Volume.AsFloat() > 0) } } + +func Test_binanceExchangeWs_GetLatestTradeCursor(t *testing.T) { + + if testing.Short() { + return + } + + startIntervalSecs := time.Now().Unix() + + testBinanceExchangeWs, err := makeBinanceWs(emptyAPIKeyBinance) + + if !assert.NoError(t, err) { + return + } + + cursor, e := testBinanceExchangeWs.GetLatestTradeCursor() + + if !assert.Error(t, e) { + return + } + + testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, "fail", nil) + + cursor, e = testBinanceExchangeWs.GetLatestTradeCursor() + + if !assert.Error(t, e) { + return + } + + testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, time.Now().Unix(), errors.New("test")) + + cursor, e = testBinanceExchangeWs.GetLatestTradeCursor() + if !assert.Error(t, e) { + return + } + + testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, time.Now().Unix(), nil) + + cursor, e = testBinanceExchangeWs.GetLatestTradeCursor() + if !assert.NoError(t, e) { + return + } + + endIntervalSecs := time.Now().Unix() + + if !assert.IsType(t, "string", cursor) { + return + } + + cursorString := cursor.(string) + cursorInt, e := strconv.ParseInt(cursorString, 10, 64) + if !assert.NoError(t, e) { + return + } + + if !assert.True(t, startIntervalSecs <= cursorInt, fmt.Sprintf("returned cursor (%d) should be gte the start time of the function call in millis (%d)", cursorInt, startIntervalSecs)) { + return + } + if !assert.True(t, endIntervalSecs >= cursorInt, fmt.Sprintf("returned cursor (%d) should be lte the end time of the function call in millis (%d)", cursorInt, endIntervalSecs)) { + return + } +} + +func Test_binanceExchangeWs_GetTradeHistory(t *testing.T) { + + if testing.Short() { + return + } + + testBinanceExchangeWs, err := makeBinanceWs(emptyAPIKeyBinance) + + if !assert.NoError(t, err) { + return + } + + pair := model.TradingPair{Base: model.XLM, Quote: model.BTC} + tradeHistoryResult, e := testBinanceExchangeWs.GetTradeHistory(pair, nil, nil) + if !assert.NoError(t, e) { + return + } + + if !assert.True(t, len(tradeHistoryResult.Trades) >= 0) { + return + } + + if !assert.NotNil(t, tradeHistoryResult.Cursor) { + return + } + + assert.Fail(t, "force fail") +} diff --git a/support/sdk/binance_ws.go b/support/sdk/binance_ws.go new file mode 100644 index 000000000..fde672b21 --- /dev/null +++ b/support/sdk/binance_ws.go @@ -0,0 +1,40 @@ +package sdk + +type EventBinance struct { + Name string `json:"e"` +} + +type EventExecutionReport struct { + EventBinance + EventTime int64 `json:"E"` // "E": 1499405658658, //Event time + Symbol string `json:"s"` // "s": "ETHBTC", //Symbol + ClientOrderID string `json:"c"` // "c": "mUvoqJxFIILMdfAW5iGSOW", //Client order ID + Side string `json:"S"` // "S": "BUY", // Side + OrderType string `json:"o"` // "o": "LIMIT", // Order type + TimeInForce string `json:"f"` // "f": "GTC", // Time in force + OrderQuantity string `json:"q"` // "q": "1.00000000", // Order quantity + OrderPrice string `json:"p"` // "p": "0.10264410", // Order price + StopPrice string `json:"P"` // "P": "0.00000000", // Stop price + IceberQuantity string `json:"F"` // "F": "0.00000000", // Iceberg quantity + OrderListID int64 `json:"g"` // "g": -1, // OrderListId + OriginalClientID interface{} `json:"C"` // "C": null, // Original client order ID; This is the ID of the order being canceled + CurrentExecutionType string `json:"x"` // "x": "NEW", // Current execution type + CurrentOrderStatus string `json:"X"` // "X": "NEW", // Current order status + OrderRejectReason string `json:"r"` // "r": "NONE", // Order reject reason; will be an error code. + OrderID int64 `json:"i"` // "i": 4293153, // Order ID + LastExecutedQuantity string `json:"l"` // "l": "0.00000000", // Last executed quantity + CumulativeFillerQuantity string `json:"z"` // "z": "0.00000000", // Cumulative filled quantity + LastExecutedPrice string `json:"L"` // "L": "0.00000000", // Last executed price + ComissionAmount string `json:"n"` // "n": "0", // Commission amount + ComissionAsset interface{} `json:"N"` // "N": null, // Commission asset + TransactionTime int64 `json:"T"` // "T": 1499405658657, // Transaction time + TradeID int64 `json:"t"` // "t": -1, // Trade ID + Ignore int64 `json:"I"` // "I": 8641984, // Ignore + IsTherOrderOnBook bool `json:"w"` // "w": true, // Is the order on the book? + IsTheTradeMarkerSide bool `json:"m"` // "m": false, // Is this trade the maker side? + IsIgnore bool `json:"M"` // "M": false, // Ignore + OrderCreationTime int64 `json:"O"` // "O": 1499405658657, // Order creation time + CumulativeQuoteAssetQuantity string `json:"Z"` // "Z": "0.00000000", // Cumulative quote asset transacted quantity + LastQuoteAssetQuantity string `json:"Y"` // "Y": "0.00000000", // Last quote asset transacted quantity (i.e. lastPrice * lastQty) + QuoteOrderQuantity string `json:"Q"` // "Q": "0.00000000" // Quote Order Qty +}