Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.

Commit 289116b

Browse files
authored
feat: OKX Provider (#7)
* init * hook up into provider factory * reconnect timeout
1 parent 450f967 commit 289116b

20 files changed

+1418
-26
lines changed

config/local/oracle.toml

+9
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ name = "crypto_dot_com"
4444
path = "config/local/providers/crypto_dot_com.json"
4545
web_socket.enabled = true
4646
web_socket.max_buffer_size = 1000 # Replace "1000" with your desired maximum buffer size.
47+
web_socket.reconnection_timeout = "5s" # Replace "5s" with your desired reconnection timeout duration.
48+
49+
[[providers]]
50+
name = "okx"
51+
path = "config/local/providers/okx.json"
52+
web_socket.enabled = true
53+
web_socket.max_buffer_size = 1000
54+
web_socket.reconnection_timeout = "10s"
55+
4756

4857
[[providers]]
4958
name = "binanceus"

config/local/providers/okx.json

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"markets": {
3+
"BITCOIN/USD": "BTC-USD",
4+
"ETHEREUM/USD": "ETH-USD",
5+
"SOLANA/USD": "SOL-USD",
6+
"ATOM/USD": "ATOM-USD",
7+
"POLKADOT/USD": "DOT-USD",
8+
"DYDX/USD": "DYDX-USD",
9+
"ETHEREUM/BITCOIN": "ETH-BTC"
10+
},
11+
"production": true
12+
}

oracle/config/provider_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ path = "testpath"
6969
[web_socket]
7070
enabled = true
7171
max_buffer_size = 100
72+
reconnection_timeout = "5s"
7273
`
7374

7475
invalidWebSocketConfigContent = `
@@ -78,6 +79,7 @@ path = "testpath"
7879
[web_socket]
7980
enabled = true
8081
max_buffer_size = -1
82+
reconnection_timeout = "5s"
8183
`
8284

8385
noHandlerSpecificationConfigContent = `
@@ -98,6 +100,16 @@ max_queries = 10
98100
[web_socket]
99101
enabled = true
100102
max_buffer_size = 100
103+
reconnection_timeout = "5s"
104+
`
105+
106+
badReconnectionTimeoutConfigContent = `
107+
name = "testname"
108+
path = "testpath"
109+
[web_socket]
110+
enabled = true
111+
max_buffer_size = 100
112+
reconnection_timeout = -1s
101113
`
102114
)
103115

@@ -152,6 +164,11 @@ func TestReadProviderConfigFromFile(t *testing.T) {
152164
config: duplicateHandlerConfigContent,
153165
expectedErr: true,
154166
},
167+
{
168+
name: "bad reconnection timeout config",
169+
config: badReconnectionTimeoutConfigContent,
170+
expectedErr: true,
171+
},
155172
}
156173

157174
for _, tc := range testCases {

oracle/config/websocket.go

+9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"fmt"
5+
"time"
56
)
67

78
// WebSocketConfig defines a config for a websocket based data provider.
@@ -13,6 +14,10 @@ type WebSocketConfig struct {
1314
// at any given time. If the provider receives more messages than this, it will
1415
// block receiving messages until the buffer is cleared.
1516
MaxBufferSize int `mapstructure:"max_buffer_size" toml:"max_buffer_size"`
17+
18+
// ReconnectionTimeout is the timeout for the provider to attempt to reconnect
19+
// to the websocket endpoint.
20+
ReconnectionTimeout time.Duration `mapstructure:"reconnection_timeout" toml:"reconnection_timeout"`
1621
}
1722

1823
func (c *WebSocketConfig) ValidateBasic() error {
@@ -24,5 +29,9 @@ func (c *WebSocketConfig) ValidateBasic() error {
2429
return fmt.Errorf("websocket max buffer size must be greater than 0")
2530
}
2631

32+
if c.ReconnectionTimeout <= 0 {
33+
return fmt.Errorf("websocket reconnection timeout must be greater than 0")
34+
}
35+
2736
return nil
2837
}

oracle/oracle.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ func (o *Oracle) fetchPrices(provider providertypes.Provider[oracletypes.Currenc
197197
timeFilteredPrices := make(map[oracletypes.CurrencyPair]*big.Int)
198198
for pair, result := range prices {
199199
// If the price is older than the update interval, skip it.
200-
if diff := time.Since(result.Timestamp); diff > o.cfg.UpdateInterval {
200+
diff := time.Now().UTC().Sub(result.Timestamp)
201+
if diff > o.cfg.UpdateInterval {
201202
o.logger.Debug(
202203
"skipping price",
203204
zap.String("provider", provider.Name()),
@@ -212,6 +213,7 @@ func (o *Oracle) fetchPrices(provider providertypes.Provider[oracletypes.Currenc
212213
zap.String("provider", provider.Name()),
213214
zap.String("pair", pair.String()),
214215
zap.String("price", result.Value.String()),
216+
zap.Duration("diff", diff),
215217
)
216218
timeFilteredPrices[pair] = result.Value
217219
}

oracle/oracle_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ var (
3333
Path: "testpath2",
3434
Name: "websocket1",
3535
WebSocket: config.WebSocketConfig{
36-
MaxBufferSize: 10,
37-
Enabled: true,
36+
MaxBufferSize: 10,
37+
Enabled: true,
38+
ReconnectionTimeout: 250 * time.Millisecond,
3839
},
3940
}
4041
)

providers/base/fetch.go

+16-7
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,24 @@ func (p *BaseProvider[K, V]) attemptDataUpdate(ctx context.Context, responseCh c
101101

102102
// startWebSocket starts a connection to the web socket and handles the incoming messages.
103103
func (p *BaseProvider[K, V]) startWebSocket(ctx context.Context, responseCh chan<- providertypes.GetResponse[K, V]) error {
104-
p.logger.Debug("starting web socket query handler")
104+
// Start the web socket query handler. If the connection fails to start, then the query handler
105+
// will be restarted after a timeout.
106+
for {
107+
select {
108+
case <-ctx.Done():
109+
p.logger.Info("provider stopped via context")
110+
return ctx.Err()
111+
default:
112+
p.logger.Debug("starting web socket query handler")
113+
if err := p.ws.Start(ctx, p.ids, responseCh); err != nil {
114+
p.logger.Error("web socket query handler returned error", zap.Error(err))
115+
}
105116

106-
if err := p.ws.Start(ctx, p.ids, responseCh); err != nil {
107-
p.logger.Error("web socket query handler returned error", zap.Error(err))
108-
return err
117+
// If the web socket query handler returns, then the connection was closed. Wait for
118+
// a bit before trying to reconnect.
119+
time.Sleep(p.cfg.WebSocket.ReconnectionTimeout)
120+
}
109121
}
110-
111-
p.logger.Debug("web socket query handler finished")
112-
return nil
113122
}
114123

115124
// recv receives responses from the response channel and updates the data.

providers/base/provider_test.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ var (
4343
Name: "websocket",
4444
Path: "test",
4545
WebSocket: config.WebSocketConfig{
46-
Enabled: true,
47-
MaxBufferSize: 10,
46+
Enabled: true,
47+
MaxBufferSize: 10,
48+
ReconnectionTimeout: time.Millisecond * 200,
4849
},
4950
}
5051
pairs = []oracletypes.CurrencyPair{
@@ -109,7 +110,7 @@ func TestStart(t *testing.T) {
109110
handler.On("Start", mock.Anything, mock.Anything, mock.Anything).Return(func() error {
110111
<-ctx.Done()
111112
return ctx.Err()
112-
}())
113+
}()).Maybe()
113114

114115
provider, err := base.NewProvider(
115116
wsConfig,
@@ -131,7 +132,7 @@ func TestStart(t *testing.T) {
131132
handler.On("Start", mock.Anything, mock.Anything, mock.Anything).Return(func() error {
132133
<-ctx.Done()
133134
return ctx.Err()
134-
}())
135+
}()).Maybe()
135136

136137
provider, err := base.NewProvider(
137138
wsConfig,
@@ -291,6 +292,20 @@ func TestWebSocketProvider(t *testing.T) {
291292
},
292293
expectedPrices: map[oracletypes.CurrencyPair]*big.Int{},
293294
},
295+
{
296+
name: "continues restarting if the query handler returns",
297+
handler: func() wshandlers.WebSocketQueryHandler[oracletypes.CurrencyPair, *big.Int] {
298+
handler := wshandlermocks.NewWebSocketQueryHandler[oracletypes.CurrencyPair, *big.Int](t)
299+
300+
handler.On("Start", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("no gib price updates")).Maybe()
301+
302+
return handler
303+
},
304+
pairs: []oracletypes.CurrencyPair{
305+
pairs[0],
306+
},
307+
expectedPrices: map[oracletypes.CurrencyPair]*big.Int{},
308+
},
294309
}
295310

296311
for _, tc := range testCases {

providers/websockets/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ Web sockets are preferred over HTTP APIs for real-time data as they only require
1111
The current set of supported providers are:
1212

1313
* [Crypto.com](./cryptodotcom/README.md) - Crypto.com is a cryptocurrency exchange that provides a free API for fetching cryptocurrency data. Crypto.com is a **primary data source** for the oracle.
14+
* [OKX](./okx/README.md) - OKX is a cryptocurrency exchange that provides a free API for fetching cryptocurrency data. OKX is a **primary data source** for the oracle.
1415

providers/websockets/cryptodotcom/parse.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
// parseInstrumentMessage is used to parse an instrument message received from the Crypto.com
1616
// web socket API. This message contains the latest price data for a set of instruments.
17-
func (h *CryptoWebSocketDataHandler) parseInstrumentMessage(
17+
func (h *WebSocketDataHandler) parseInstrumentMessage(
1818
msg InstrumentResponseMessage,
1919
) (providertypes.GetResponse[oracletypes.CurrencyPair, *big.Int], error) {
2020
var (

providers/websockets/cryptodotcom/ws_data_handler.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ const (
1818
Name = "crypto_dot_com"
1919
)
2020

21-
var _ handlers.WebSocketDataHandler[oracletypes.CurrencyPair, *big.Int] = (*CryptoWebSocketDataHandler)(nil)
21+
var _ handlers.WebSocketDataHandler[oracletypes.CurrencyPair, *big.Int] = (*WebSocketDataHandler)(nil)
2222

23-
// CryptoWebSocketDataHandler implements the WebSocketDataHandler interface. This is used to
23+
// WebSocketDataHandler implements the WebSocketDataHandler interface. This is used to
2424
// handle messages received from the Crypto.com websocket API.
25-
type CryptoWebSocketDataHandler struct {
25+
type WebSocketDataHandler struct {
2626
logger *zap.Logger
2727

2828
// config is the config for the Crypto.com web socket API.
@@ -33,7 +33,7 @@ type CryptoWebSocketDataHandler struct {
3333
func NewWebSocketDataHandlerFromConfig(
3434
logger *zap.Logger,
3535
providerCfg config.ProviderConfig,
36-
) (*CryptoWebSocketDataHandler, error) {
36+
) (handlers.WebSocketDataHandler[oracletypes.CurrencyPair, *big.Int], error) {
3737
if providerCfg.Name != Name {
3838
return nil, fmt.Errorf("invalid provider name %s", providerCfg.Name)
3939
}
@@ -43,7 +43,7 @@ func NewWebSocketDataHandlerFromConfig(
4343
return nil, fmt.Errorf("failed to read config file %s: %s", providerCfg.Path, err)
4444
}
4545

46-
return &CryptoWebSocketDataHandler{
46+
return &WebSocketDataHandler{
4747
config: cfg,
4848
logger: logger.With(zap.String("web_socket_data_handler", Name)),
4949
}, nil
@@ -53,12 +53,12 @@ func NewWebSocketDataHandlerFromConfig(
5353
func NewWebSocketDataHandler(
5454
logger *zap.Logger,
5555
cfg Config,
56-
) (*CryptoWebSocketDataHandler, error) {
56+
) (handlers.WebSocketDataHandler[oracletypes.CurrencyPair, *big.Int], error) {
5757
if err := cfg.ValidateBasic(); err != nil {
5858
return nil, fmt.Errorf("invalid config: %s", err)
5959
}
6060

61-
return &CryptoWebSocketDataHandler{
61+
return &WebSocketDataHandler{
6262
config: cfg,
6363
logger: logger.With(zap.String("web_socket_data_handler", Name)),
6464
}, nil
@@ -69,7 +69,7 @@ func NewWebSocketDataHandler(
6969
// a heartbeat response message must be sent back to the Crypto.com web socket API, otherwise
7070
// the connection will be closed. If a subscribe message is received, the message must be parsed
7171
// and a response must be returned. No update message is required for subscribe messages.
72-
func (h *CryptoWebSocketDataHandler) HandleMessage(
72+
func (h *WebSocketDataHandler) HandleMessage(
7373
message []byte,
7474
) (providertypes.GetResponse[oracletypes.CurrencyPair, *big.Int], []byte, error) {
7575
var (
@@ -117,7 +117,7 @@ func (h *CryptoWebSocketDataHandler) HandleMessage(
117117
// CreateMessage is used to create a message to send to the data provider. This is used to
118118
// subscribe to the given currency pairs. This is called when the connection to the data
119119
// provider is first established.
120-
func (h *CryptoWebSocketDataHandler) CreateMessage(
120+
func (h *WebSocketDataHandler) CreateMessage(
121121
cps []oracletypes.CurrencyPair,
122122
) ([]byte, error) {
123123
instruments := make([]string, 0)
@@ -140,12 +140,12 @@ func (h *CryptoWebSocketDataHandler) CreateMessage(
140140
}
141141

142142
// Name returns the name of the data provider.
143-
func (h *CryptoWebSocketDataHandler) Name() string {
143+
func (h *WebSocketDataHandler) Name() string {
144144
return Name
145145
}
146146

147147
// URL is used to get the URL of the data provider.
148-
func (h *CryptoWebSocketDataHandler) URL() string {
148+
func (h *WebSocketDataHandler) URL() string {
149149
if h.config.Production {
150150
return ProductionURL
151151
}

providers/websockets/okx/README.md

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# OKX Provider
2+
3+
## Overview
4+
5+
The OKX provider is used to fetch the ticker price from the [OKX web socket API](https://www.okx.com/docs-v5/en/#overview-websocket-overview). The web socket request size for data transmission between the client and server is only 2 bytes. The total number of requests to subscribe to new markets is limited to 3 requests per second. The total number of 'subscribe'/'unsubscribe'/'login' requests per connection is limited to 480 times per hour. WebSocket login and subscription rate limits are based on connection.
6+
7+
Connections will break automatically if the subscription is not established or data has not been pushed for more than 30 seconds. [Per OKX documentation](https://www.okx.com/docs-v5/en/#overview-websocket-overview),
8+
9+
```text
10+
If there’s a network problem, the system will automatically disable the connection.
11+
12+
The connection will break automatically if the subscription is not established or data has not been pushed for more than 30 seconds.
13+
14+
To keep the connection stable:
15+
16+
1. Set a timer of N seconds whenever a response message is received, where N is less than 30.
17+
2. If the timer is triggered, which means that no new message is received within N seconds, send the String 'ping'.
18+
3. Expect a 'pong' as a response. If the response message is not received within N seconds, please raise an error or reconnect.
19+
```
20+
21+
OKX provides [public and private channels](https://www.okx.com/docs-v5/en/?shell#overview-websocket-subscribe).
22+
23+
* Public channels -- No authentication is required, include tickers channel, K-Line channel, limit price channel, order book channel, and mark price channel etc.
24+
* Private channels -- including account channel, order channel, and position channel, etc -- require log in.
25+
26+
Users can choose to subscribe to one or more channels, and the total length of multiple channels cannot exceed 64 KB. This provider is implemented assuming that the user is only subscribing to public channels.
27+
28+
The exact channel that is used to subscribe to the ticker price is the [`Index Tickers Channel`](https://www.okx.com/docs-v5/en/?shell#public-data-websocket-index-tickers-channel). This pushes data every 100ms if there are any price updates, otherwise it will push updates once a minute.
29+
30+
To retrieve all of the supported [spot markets](https://www.okx.com/docs-v5/en/?shell#public-data-rest-api-get-instruments), please run the following command:
31+
32+
```bash
33+
curl https://www.okx.com/api/v5/public/instruments?instType=SPOT
34+
```
35+
36+
## Configuration
37+
38+
The configuration structure for this provider looks like the following:
39+
40+
```golang
41+
type Config struct {
42+
// Markets is the list of markets to subscribe to. The key is the currency pair and the value
43+
// is the instrument ID. The instrument ID must correspond to the spot market. For example,
44+
// the instrument ID for the BITCOIN/USDT market is BTC-USDT.
45+
Markets map[string]string `json:"markets"`
46+
47+
// Production is true if the config is for production.
48+
Production bool `json:"production"`
49+
}
50+
```
51+
52+
Note that if production is set to false, all prices returned by any subscribed markets will be static. A sample configuration is shown below:
53+
54+
```json
55+
{
56+
"markets": {
57+
"BITCOIN/USD": "BTC-USD", // Spot market
58+
"ETHEREUM/USD": "ETH-USD", // Spot market
59+
"SOLANA/USD": "SOL-USD", // Spot market
60+
"ATOM/USD": "ATOM-USD", // Spot market
61+
"POLKADOT/USD": "DOT-USD", // Spot market
62+
"DYDX/USD": "DYDX-USD", // Spot market
63+
"ETHEREUM/BITCOIN": "ETH-BTC" // Spot market
64+
},
65+
"production": true
66+
}
67+
```

0 commit comments

Comments
 (0)