Skip to content

Commit 608fb0c

Browse files
committed
Continue Binance adapter in Rust
1 parent c66887b commit 608fb0c

File tree

7 files changed

+137
-22
lines changed

7 files changed

+137
-22
lines changed

crates/adapters/binance/src/common/consts.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515

1616
//! Binance venue constants and API endpoints.
1717
18-
use std::sync::LazyLock;
18+
use std::{num::NonZeroU32, sync::LazyLock};
1919

2020
use nautilus_model::identifiers::Venue;
21+
use nautilus_network::ratelimiter::quota::Quota;
2122

2223
use super::enums::{BinanceRateLimitInterval, BinanceRateLimitType};
2324

@@ -204,3 +205,19 @@ pub const BINANCE_EAPI_RATE_LIMITS: &[BinanceRateLimitQuota] = &[
204205
limit: 200,
205206
},
206207
];
208+
209+
/// WebSocket subscription rate limit: 5 messages per second.
210+
///
211+
/// Binance limits incoming WebSocket messages (subscribe/unsubscribe) to 5 per second.
212+
pub static BINANCE_WS_SUBSCRIPTION_QUOTA: LazyLock<Quota> =
213+
LazyLock::new(|| Quota::per_second(NonZeroU32::new(5).expect("5 > 0")));
214+
215+
/// WebSocket connection rate limit: 1 per second (conservative).
216+
///
217+
/// Binance limits connections to 300 per 5 minutes per IP. This conservative quota
218+
/// of 1 per second helps avoid hitting the connection limit during reconnection storms.
219+
pub static BINANCE_WS_CONNECTION_QUOTA: LazyLock<Quota> =
220+
LazyLock::new(|| Quota::per_second(NonZeroU32::new(1).expect("1 > 0")));
221+
222+
/// Rate limit key for WebSocket subscription operations.
223+
pub const BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION: &str = "subscription";

crates/adapters/binance/src/futures/execution.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::{
2626

2727
use anyhow::Context;
2828
use async_trait::async_trait;
29+
use dashmap::DashMap;
2930
use futures_util::{StreamExt, pin_mut};
3031
use nautilus_common::{
3132
clients::ExecutionClient,
@@ -65,10 +66,11 @@ use nautilus_model::{
6566
use rust_decimal::Decimal;
6667
use tokio::task::JoinHandle;
6768
use tokio_util::sync::CancellationToken;
69+
use ustr::Ustr;
6870

6971
use super::{
7072
http::{
71-
client::BinanceFuturesHttpClient,
73+
client::{BinanceFuturesHttpClient, BinanceFuturesInstrument},
7274
models::{BatchOrderResult, BinancePositionRisk},
7375
query::{
7476
BatchCancelItem, BinanceAllOrdersParamsBuilder, BinanceOpenOrdersParamsBuilder,
@@ -295,6 +297,7 @@ impl BinanceFuturesExecutionClient {
295297
}
296298

297299
/// Handles WebSocket messages from the user data stream.
300+
#[allow(clippy::too_many_arguments)]
298301
fn handle_ws_message(
299302
message: BinanceFuturesWsMessage,
300303
exec_sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
@@ -303,6 +306,7 @@ impl BinanceFuturesExecutionClient {
303306
account_type: AccountType,
304307
product_type: BinanceProductType,
305308
clock: &'static AtomicTime,
309+
instruments: &DashMap<Ustr, BinanceFuturesInstrument>,
306310
) {
307311
match message {
308312
BinanceFuturesWsMessage::Exec(exec_msg) => {
@@ -314,6 +318,7 @@ impl BinanceFuturesExecutionClient {
314318
account_type,
315319
product_type,
316320
clock,
321+
instruments,
317322
);
318323
}
319324
BinanceFuturesWsMessage::Data(_) => {
@@ -333,6 +338,7 @@ impl BinanceFuturesExecutionClient {
333338
}
334339

335340
/// Handles execution messages from the user data stream.
341+
#[allow(clippy::too_many_arguments)]
336342
fn handle_exec_message(
337343
message: NautilusFuturesExecWsMessage,
338344
exec_sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
@@ -341,6 +347,7 @@ impl BinanceFuturesExecutionClient {
341347
account_type: AccountType,
342348
product_type: BinanceProductType,
343349
clock: &'static AtomicTime,
350+
instruments: &DashMap<Ustr, BinanceFuturesInstrument>,
344351
) {
345352
match message {
346353
NautilusFuturesExecWsMessage::OrderUpdate(update) => {
@@ -351,6 +358,7 @@ impl BinanceFuturesExecutionClient {
351358
account_id,
352359
product_type,
353360
clock,
361+
instruments,
354362
);
355363
}
356364
NautilusFuturesExecWsMessage::AccountUpdate(update) => {
@@ -386,6 +394,7 @@ impl BinanceFuturesExecutionClient {
386394
account_id: AccountId,
387395
product_type: BinanceProductType,
388396
clock: &'static AtomicTime,
397+
instruments: &DashMap<Ustr, BinanceFuturesInstrument>,
389398
) {
390399
let order_data = &msg.order;
391400
let ts_event = UnixNanos::from((msg.event_time * 1_000_000) as u64);
@@ -453,13 +462,29 @@ impl BinanceFuturesExecutionClient {
453462
strategy_id,
454463
ts_event,
455464
ts_init,
465+
instruments,
456466
);
457467
}
458468
BinanceExecutionType::Amendment => {
459-
// Order modified - use default precision since we don't have cache access
469+
// Look up precision from instrument cache
470+
let symbol_key = Ustr::from(&order_data.symbol);
471+
let (price_precision, size_precision) = if let Some(inst) =
472+
instruments.get(&symbol_key)
473+
{
474+
(
475+
inst.price_precision() as u8,
476+
inst.quantity_precision() as u8,
477+
)
478+
} else {
479+
log::warn!(
480+
"Instrument not found in cache for amendment: symbol={}, using default precision",
481+
order_data.symbol
482+
);
483+
(8_u8, 8_u8)
484+
};
485+
460486
let quantity: f64 = order_data.original_qty.parse().unwrap_or(0.0);
461487
let price: f64 = order_data.original_price.parse().unwrap_or(0.0);
462-
let (price_precision, size_precision) = (8_u8, 8_u8);
463488

464489
let event = OrderUpdated::new(
465490
trader_id,
@@ -508,9 +533,23 @@ impl BinanceFuturesExecutionClient {
508533
strategy_id: StrategyId,
509534
ts_event: UnixNanos,
510535
ts_init: UnixNanos,
536+
instruments: &DashMap<Ustr, BinanceFuturesInstrument>,
511537
) {
512538
let order_data = &msg.order;
513539

540+
// Look up precision from instrument cache
541+
let symbol_key = Ustr::from(&order_data.symbol);
542+
let Some(inst) = instruments.get(&symbol_key) else {
543+
log::error!(
544+
"Instrument not found in cache for fill: symbol={}, skipping fill event to avoid precision mismatch",
545+
order_data.symbol
546+
);
547+
return;
548+
};
549+
let price_precision = inst.price_precision() as u8;
550+
let size_precision = inst.quantity_precision() as u8;
551+
drop(inst); // Release the DashMap lock
552+
514553
let last_qty: f64 = order_data.last_filled_qty.parse().unwrap_or(0.0);
515554
let last_px: f64 = order_data.last_filled_price.parse().unwrap_or(0.0);
516555
let cum_qty: f64 = order_data.cumulative_filled_qty.parse().unwrap_or(0.0);
@@ -522,9 +561,6 @@ impl BinanceFuturesExecutionClient {
522561
.parse()
523562
.unwrap_or(0.0);
524563

525-
// Use default precision since we don't have cache access in spawned task
526-
let (price_precision, size_precision) = (8_u8, 8_u8);
527-
528564
let commission_currency = order_data
529565
.commission_asset
530566
.as_ref()
@@ -977,6 +1013,7 @@ impl ExecutionClient for BinanceFuturesExecutionClient {
9771013
let product_type = self.product_type;
9781014
let clock = self.clock;
9791015
let cancel = self.cancellation_token.clone();
1016+
let instruments = self.http_client.instruments_cache();
9801017

9811018
let ws_task = get_runtime().spawn(async move {
9821019
pin_mut!(stream);
@@ -991,6 +1028,7 @@ impl ExecutionClient for BinanceFuturesExecutionClient {
9911028
account_type,
9921029
product_type,
9931030
clock,
1031+
&instruments,
9941032
);
9951033
}
9961034
() = cancel.cancelled() => {

crates/adapters/binance/src/futures/http/client.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,26 @@ pub enum BinanceFuturesInstrument {
942942
CoinM(BinanceFuturesCoinSymbol),
943943
}
944944

945+
impl BinanceFuturesInstrument {
946+
/// Returns the price precision for the instrument.
947+
#[must_use]
948+
pub const fn price_precision(&self) -> i32 {
949+
match self {
950+
Self::UsdM(s) => s.price_precision,
951+
Self::CoinM(s) => s.price_precision,
952+
}
953+
}
954+
955+
/// Returns the quantity precision for the instrument.
956+
#[must_use]
957+
pub const fn quantity_precision(&self) -> i32 {
958+
match self {
959+
Self::UsdM(s) => s.quantity_precision,
960+
Self::CoinM(s) => s.quantity_precision,
961+
}
962+
}
963+
}
964+
945965
/// Binance Futures HTTP client for USD-M and COIN-M perpetuals.
946966
#[derive(Debug, Clone)]
947967
#[cfg_attr(
@@ -1010,10 +1030,10 @@ impl BinanceFuturesHttpClient {
10101030
&self.raw
10111031
}
10121032

1013-
/// Returns a reference to the instruments cache.
1033+
/// Returns a clone of the instruments cache Arc.
10141034
#[must_use]
1015-
pub fn instruments_cache(&self) -> &DashMap<Ustr, BinanceFuturesInstrument> {
1016-
&self.instruments
1035+
pub fn instruments_cache(&self) -> Arc<DashMap<Ustr, BinanceFuturesInstrument>> {
1036+
Arc::clone(&self.instruments)
10171037
}
10181038

10191039
/// Returns server time.

crates/adapters/binance/src/futures/websocket/client.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ use super::{
5252
messages::{BinanceFuturesHandlerCommand, BinanceFuturesWsMessage},
5353
};
5454
use crate::common::{
55+
consts::{
56+
BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION, BINANCE_WS_CONNECTION_QUOTA,
57+
BINANCE_WS_SUBSCRIPTION_QUOTA,
58+
},
5559
credential::Credential,
5660
enums::{BinanceEnvironment, BinanceProductType},
5761
urls::get_ws_base_url,
@@ -214,13 +218,19 @@ impl BinanceFuturesWebSocketClient {
214218
reconnect_max_attempts: None,
215219
};
216220

221+
// Configure rate limits for subscription operations
222+
let keyed_quotas = vec![(
223+
BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.to_string(),
224+
*BINANCE_WS_SUBSCRIPTION_QUOTA,
225+
)];
226+
217227
let client = WebSocketClient::connect(
218228
config,
219229
Some(raw_handler),
220230
Some(ping_handler),
221231
None,
222-
vec![],
223-
None,
232+
keyed_quotas,
233+
Some(*BINANCE_WS_CONNECTION_QUOTA),
224234
)
225235
.await
226236
.map_err(|e| BinanceWsError::NetworkError(e.to_string()))?;

crates/adapters/binance/src/futures/websocket/handler.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ use super::{
4949
parse_kline, parse_mark_price, parse_trade,
5050
},
5151
};
52-
use crate::common::enums::{BinanceWsEventType, BinanceWsMethod};
52+
use crate::common::{
53+
consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION,
54+
enums::{BinanceWsEventType, BinanceWsMethod},
55+
};
5356

5457
/// Handler for Binance Futures WebSocket JSON streams.
5558
pub struct BinanceFuturesWsFeedHandler {
@@ -182,7 +185,8 @@ impl BinanceFuturesWsFeedHandler {
182185
}
183186
};
184187

185-
if let Err(e) = client.send_text(json, None).await {
188+
let rate_limit_keys = Some(vec![BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]);
189+
if let Err(e) = client.send_text(json, rate_limit_keys).await {
186190
log::error!("Failed to send subscribe request: {e}");
187191
}
188192
}
@@ -209,7 +213,8 @@ impl BinanceFuturesWsFeedHandler {
209213
}
210214
};
211215

212-
if let Err(e) = client.send_text(json, None).await {
216+
let rate_limit_keys = Some(vec![BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]);
217+
if let Err(e) = client.send_text(json, rate_limit_keys).await {
213218
log::error!("Failed to send unsubscribe request: {e}");
214219
}
215220

crates/adapters/binance/src/spot/websocket/streams/client.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,13 @@ use super::{
5151
messages::{BinanceSpotWsMessage, HandlerCommand},
5252
subscription::MAX_STREAMS_PER_CONNECTION,
5353
};
54-
use crate::common::{consts::BINANCE_SPOT_SBE_WS_URL, credential::Ed25519Credential};
54+
use crate::common::{
55+
consts::{
56+
BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION, BINANCE_SPOT_SBE_WS_URL, BINANCE_WS_CONNECTION_QUOTA,
57+
BINANCE_WS_SUBSCRIPTION_QUOTA,
58+
},
59+
credential::Ed25519Credential,
60+
};
5561

5662
/// Binance Spot WebSocket client for SBE market data streams.
5763
#[derive(Clone)]
@@ -195,13 +201,19 @@ impl BinanceSpotWebSocketClient {
195201
reconnect_max_attempts: None,
196202
};
197203

204+
// Configure rate limits for subscription operations
205+
let keyed_quotas = vec![(
206+
BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.to_string(),
207+
*BINANCE_WS_SUBSCRIPTION_QUOTA,
208+
)];
209+
198210
let client = WebSocketClient::connect(
199211
config,
200212
Some(raw_handler),
201213
Some(ping_handler),
202214
None,
203-
vec![],
204-
None,
215+
keyed_quotas,
216+
Some(*BINANCE_WS_CONNECTION_QUOTA),
205217
)
206218
.await
207219
.map_err(|e| {

crates/adapters/binance/src/spot/websocket/streams/handler.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use super::{
5757
parse_trades_event,
5858
},
5959
};
60+
use crate::common::consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION;
6061

6162
/// Binance Spot WebSocket feed handler.
6263
///
@@ -358,7 +359,11 @@ impl BinanceSpotWsFeedHandler {
358359
self.subscriptions.mark_subscribe(stream);
359360
}
360361

361-
self.send_text(payload).await?;
362+
self.send_text(
363+
payload,
364+
Some(vec![BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
365+
)
366+
.await?;
362367
Ok(())
363368
}
364369

@@ -368,7 +373,11 @@ impl BinanceSpotWsFeedHandler {
368373
let request = BinanceWsSubscription::unsubscribe(streams.clone(), request_id);
369374
let payload = serde_json::to_string(&request)?;
370375

371-
self.send_text(payload).await?;
376+
self.send_text(
377+
payload,
378+
Some(vec![BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.to_string()]),
379+
)
380+
.await?;
372381

373382
// Immediately confirm unsubscribe (don't wait for response)
374383
// We don't track unsubscribe failures - the stream will simply stop
@@ -381,12 +390,16 @@ impl BinanceSpotWsFeedHandler {
381390
}
382391

383392
/// Send text message via WebSocket.
384-
async fn send_text(&self, payload: String) -> anyhow::Result<()> {
393+
async fn send_text(
394+
&self,
395+
payload: String,
396+
rate_limit_keys: Option<Vec<String>>,
397+
) -> anyhow::Result<()> {
385398
let Some(client) = &self.inner else {
386399
anyhow::bail!("No active WebSocket client");
387400
};
388401
client
389-
.send_text(payload, None)
402+
.send_text(payload, rate_limit_keys)
390403
.await
391404
.map_err(|e| anyhow::anyhow!("Failed to send message: {e}"))?;
392405
Ok(())

0 commit comments

Comments
 (0)