diff --git a/collector/src/bitmart/http.rs b/collector/src/bitmart/http.rs new file mode 100644 index 0000000..541cde8 --- /dev/null +++ b/collector/src/bitmart/http.rs @@ -0,0 +1,170 @@ +use std::{ + io, + io::ErrorKind, + time::{Duration, Instant}, +}; + +use anyhow::Error; +use chrono::{DateTime, Utc}; +use futures_util::{SinkExt, StreamExt}; +use tokio::{ + select, + sync::mpsc::{UnboundedSender, unbounded_channel}, +}; +use tokio_tungstenite::{ + connect_async, + tungstenite::{Bytes, Message, Utf8Bytes, client::IntoClientRequest}, +}; +use tracing::{error, warn}; + +pub async fn fetch_depth_snapshot(symbol: &str) -> Result { + reqwest::Client::new() + .get(format!( + "https://api-cloud-v2.bitmart.com/contract/public/depth?symbol={symbol}" + )) + .header("Accept", "application/json") + .send() + .await? + .text() + .await +} + +pub async fn connect( + url: &str, + topics: Vec, + ws_tx: UnboundedSender<(DateTime, Utf8Bytes)>, +) -> Result<(), anyhow::Error> { + let request = url.into_client_request()?; + let (ws_stream, _) = connect_async(request).await?; + let (mut write, mut read) = ws_stream.split(); + let (tx, mut rx) = unbounded_channel::<()>(); + + let s = format!( + r#"{{"action":"subscribe","args":[{}]}}"#, + topics + .iter() + .map(|s| format!("\"{s}\"")) + .collect::>() + .join(",") + ); + + write + .send(Message::Text( + format!( + // r#"{{"req_id": "subscribe", "op": "subscribe", "args": [{}]}}"#, + r#"{{"action":"subscribe","args":[{}]}}"#, + topics + .iter() + .map(|s| format!("\"{s}\"")) + .collect::>() + .join(",") + ) + .into(), + )) + .await?; + + tokio::spawn(async move { + let mut ping_interval = tokio::time::interval(Duration::from_secs(30)); + loop { + select! { + result = rx.recv() => { + match result { + Some(_) => { + if write.send(Message::Pong(Bytes::default())).await.is_err() { + return; + } + } + None => { + break; + } + } + } + _ = ping_interval.tick() => { + if write.send( + Message::Text(r#"{"action":"ping"}"#.into()) + ).await.is_err() { + return; + } + } + } + } + }); + + loop { + match read.next().await { + Some(Ok(Message::Text(text))) => { + let recv_time = Utc::now(); + if ws_tx.send((recv_time, text)).is_err() { + break; + } + } + Some(Ok(Message::Binary(_))) => {} + Some(Ok(Message::Ping(_))) => { + tx.send(()).unwrap(); + } + Some(Ok(Message::Pong(_))) => {} + Some(Ok(Message::Close(close_frame))) => { + warn!(?close_frame, "closed"); + return Err(Error::from(io::Error::new( + ErrorKind::ConnectionAborted, + "closed", + ))); + } + Some(Ok(Message::Frame(_))) => {} + Some(Err(e)) => { + return Err(Error::from(e)); + } + None => { + break; + } + } + } + Ok(()) +} + +pub async fn keep_connection( + topics: Vec, + symbol_list: Vec, + ws_tx: UnboundedSender<(DateTime, Utf8Bytes)>, +) { + let mut error_count = 0; + loop { + let connect_time = Instant::now(); + let topics_ = symbol_list + .iter() + .flat_map(|pair| { + topics + .iter() + .cloned() + .map(|stream| { + stream + .replace("$symbol", pair.to_uppercase().as_str()) + .to_string() + }) + .collect::>() + }) + .collect::>(); + if let Err(error) = connect( + "wss://openapi-ws-v2.bitmart.com/api?protocol=1.1", + topics_, + ws_tx.clone(), + ) + .await + { + error!(?error, "websocket error"); + error_count += 1; + if connect_time.elapsed() > Duration::from_secs(30) { + error_count = 0; + } + if error_count > 3 { + tokio::time::sleep(Duration::from_secs(1)).await; + } else if error_count > 10 { + tokio::time::sleep(Duration::from_secs(5)).await; + } else if error_count > 20 { + tokio::time::sleep(Duration::from_secs(10)).await; + } + } else { + break; + } + } +} diff --git a/collector/src/bitmart/mod.rs b/collector/src/bitmart/mod.rs new file mode 100644 index 0000000..a4f91e9 --- /dev/null +++ b/collector/src/bitmart/mod.rs @@ -0,0 +1,52 @@ +mod http; + +use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; +use chrono::{DateTime, Utc}; +pub use http::{fetch_depth_snapshot, keep_connection}; +use tracing::{error, warn}; +use tokio_tungstenite::tungstenite::Utf8Bytes; + +use crate::{error::ConnectorError, throttler::Throttler}; +use std::collections::HashMap; + +fn handle( + writer_tx: &UnboundedSender<(DateTime, String, String)>, + recv_time: DateTime, + data: Utf8Bytes +) -> Result<(), ConnectorError> { + let j: serde_json::Value = serde_json::from_str(data.as_str())?; + let group = j.get("group").ok_or(ConnectorError::FormatError)?.as_str().ok_or(ConnectorError::FormatError)?; + // If the group string starts with "futures/trade" + if group.starts_with("futures/trade") { + let symbol = group.split("/trade:").last().ok_or(ConnectorError::FormatError)?; + let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string())); + } else if group.starts_with("futures/depthIncrease50") { + if let Some(j_data) = j.get("data") { + if let Some(j_symbol) = j_data + .as_object() + .ok_or(ConnectorError::FormatError)? + .get("symbol") + { + let symbol = j_symbol.as_str().ok_or(ConnectorError::FormatError)?; + let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string())); + } + } + } + Ok(()) +} + +pub async fn run_collection( + topics: Vec, + symbols: Vec, + writer_tx: UnboundedSender<(DateTime, String, String)>, +) -> Result<(), anyhow::Error> { + let (ws_tx, mut ws_rx) = unbounded_channel(); + let h = tokio::spawn(keep_connection(topics, symbols, ws_tx.clone())); + while let Some((recv_time, data)) = ws_rx.recv().await { + if let Err(error) = handle(&writer_tx, recv_time, data) { + error!(?error, "couldn't handle the received data."); + } + } + let _ = h.await; + Ok(()) +} \ No newline at end of file diff --git a/collector/src/main.rs b/collector/src/main.rs index f93f1ed..ba0b866 100644 --- a/collector/src/main.rs +++ b/collector/src/main.rs @@ -12,6 +12,7 @@ mod bybit; mod error; mod file; mod throttler; +mod bitmart; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -90,6 +91,17 @@ async fn main() -> Result<(), anyhow::Error> { tokio::spawn(bybit::run_collection(topics, args.symbols, writer_tx)) } + "bitmart" => { + let topics = [ + "futures/depthIncrease50:$symbol@100ms", + "futures/trade:$symbol", + ] + .iter() + .map(|topic| topic.to_string()) + .collect(); + + tokio::spawn(bitmart::run_collection(topics, args.symbols, writer_tx)) + } exchange => { return Err(anyhow!("{exchange} is not supported.")); } diff --git a/py-hftbacktest/hftbacktest/data/utils/bitmart.py b/py-hftbacktest/hftbacktest/data/utils/bitmart.py new file mode 100644 index 0000000..e70886d --- /dev/null +++ b/py-hftbacktest/hftbacktest/data/utils/bitmart.py @@ -0,0 +1,254 @@ +import gzip +import json +import datetime +from typing import Optional, Literal + +import numpy as np +from numpy.typing import NDArray + +from ..validation import correct_event_order, correct_local_timestamp, validate_event_order +from ...types import ( + DEPTH_EVENT, + DEPTH_CLEAR_EVENT, + DEPTH_SNAPSHOT_EVENT, + TRADE_EVENT, + BUY_EVENT, + SELL_EVENT, + event_dtype +) + + +def convert( + input_filename: str, + output_filename: Optional[str] = None, + base_latency: float = 0, + buffer_size: int = 100_000_000 +) -> NDArray: + r""" + Converts raw BitMart feed stream file into a format compatible with HftBacktest. + If you encounter an ``IndexError`` due to an out-of-bounds, try increasing the ``buffer_size``. + + **File Format:** + + .. code-block:: + + # Depth Snapshot + 1741630633009878000 {"data":{"symbol":"BTCUSDT","asks":[{"price":"78723.1","vol":"468"},...],"bids":[{"price":"78709","vol":"3437"},...],"ms_t":1741630632927,"version":7822754,"type":"snapshot"},"group":"futures/depthIncrease50:BTCUSDT@100ms"} + + # Depth Update + 1741630633063800000 {"data":{"symbol":"BTCUSDT","asks":[{"price":"78720.1","vol":"766"},...],"bids":[{"price":"78719.8","vol":"433"},...],"ms_t":1741630632990,"version":7822755,"type":"update"},"group":"futures/depthIncrease50:BTCUSDT@100ms"} + + # Trade + 1741630634788924000 {"data":[{"trade_id":3000000389425121,"symbol":"BTCUSDT","deal_price":"78727.3","deal_vol":"310","way":2,"m":true,"created_at":"2025-03-10T18:17:14.656686827Z"}],"group":"futures/trade:BTCUSDT"} + + Args: + input_filename: Input filename with path. + output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format. + base_latency: The value to be added to the feed latency. + See :func:`.correct_local_timestamp`. + buffer_size: Sets a preallocated row size for the buffer. + + Returns: + Converted data compatible with HftBacktest. + """ + timestamp_mul = 1000000 # Multiplier to convert ms to ns + + tmp = np.empty(buffer_size, event_dtype) + row_num = 0 + with gzip.open(input_filename, 'r') as f: + while True: + line = f.readline() + if not line: + break + + try: + # Find the first space which separates timestamp from JSON data + space_index = line.find(b' ') + if space_index == -1: + continue # Skip malformed lines + + local_timestamp = int(line[:space_index]) + message = json.loads(line[space_index + 1:]) + + # Check if the message has data field + if 'data' not in message: + continue + + data = message['data'] + group = message.get('group', '') + + # Process depth data (snapshot or update) + if 'symbol' in data and ('bids' in data or 'asks' in data): + ms_t = data.get('ms_t', 0) # BitMart exchange timestamp in milliseconds + exch_timestamp = int(ms_t) * timestamp_mul # Convert to nanoseconds + + # For snapshots, add depth clear events before processing + if data.get('type') == 'snapshot': + # We need to add these *before* the snapshot data + if 'bids' in data and data['bids']: + bid_prices = [float(bid['price']) for bid in data['bids']] + # For bids, the clear should be up to the lowest price (max price for comparison) + bid_clear_upto = min(bid_prices) + + # Insert the clear event + tmp[row_num] = ( + DEPTH_CLEAR_EVENT | BUY_EVENT, + exch_timestamp, + local_timestamp, + bid_clear_upto, + 0, + 0, + 0, + 0 + ) + row_num += 1 + + if 'asks' in data and data['asks']: + ask_prices = [float(ask['price']) for ask in data['asks']] + # For asks, the clear should be up to the highest price (min price for comparison) + ask_clear_upto = max(ask_prices) + + # Insert the clear event + tmp[row_num] = ( + DEPTH_CLEAR_EVENT | SELL_EVENT, + exch_timestamp, + local_timestamp, + ask_clear_upto, + 0, + 0, + 0, + 0 + ) + row_num += 1 + + # Process bids + if 'bids' in data: + for bid in data['bids']: + price = bid['price'] + qty = bid['vol'] + + # For updates, volume of 0 means to remove the price level + event_type = DEPTH_EVENT + if data.get('type') == 'snapshot': + event_type = DEPTH_SNAPSHOT_EVENT + + tmp[row_num] = ( + event_type | BUY_EVENT, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 + + # Process asks + if 'asks' in data: + for ask in data['asks']: + price = ask['price'] + qty = ask['vol'] + + # For updates, volume of 0 means to remove the price level + event_type = DEPTH_EVENT + if data.get('type') == 'snapshot': + event_type = DEPTH_SNAPSHOT_EVENT + + tmp[row_num] = ( + event_type | SELL_EVENT, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 + + # Process trade data + elif isinstance(data, list) and 'futures/trade' in group: + for trade in data: + if 'deal_price' in trade and 'deal_vol' in trade: + # Parse timestamp from created_at field + created_at = trade.get('created_at', '') + if created_at: + try: + # Format: "2025-03-10T18:17:14.656686827Z" + # Convert to nanoseconds + dt = datetime.datetime.strptime(created_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") + # Set to UTC + dt = dt.replace(tzinfo=datetime.timezone.utc) + nanos_part = created_at.split('.')[1].rstrip('Z') + nanos = int(nanos_part.ljust(9, '0')[:9]) # Ensure 9 digits for nanos + + # Convert to Unix timestamp in nanoseconds + exch_timestamp = int(dt.timestamp()) * 1000000000 + nanos + except (ValueError, IndexError): + # Fallback to ms_t if available, otherwise use local_timestamp + exch_timestamp = int(trade.get('ms_t', local_timestamp // 1000)) * timestamp_mul + else: + # Fallback to ms_t if available, otherwise use local_timestamp + exch_timestamp = int(trade.get('ms_t', local_timestamp // 1000)) * timestamp_mul + + price = trade['deal_price'] + qty = trade['deal_vol'] + + # Determine trade side using the 'way' and 'm' fields + way = trade.get('way', 0) + is_buyer_maker = trade.get('m', False) + + # BitMart way field meanings: + # 1 = buy_open_long sell_open_short + # 2 = buy_open_long sell_close_long + # 3 = buy_close_short sell_open_short + # 4 = buy_close_short sell_close_long + # 5 = sell_open_short buy_open_long + # 6 = sell_open_short buy_close_short + # 7 = sell_close_long buy_open_long + # 8 = sell_close_long buy_close_short + + # The 'm' field: true is "buyer is maker", false is "seller is maker" + # For HftBacktest, we need to indicate the initiator's side (the taker) + + # Determine the taker side based on is_buyer_maker + # If buyer is maker (m=true), then seller is taker -> SELL_EVENT + # If seller is maker (m=false), then buyer is taker -> BUY_EVENT + side_event = SELL_EVENT if is_buyer_maker else BUY_EVENT + + tmp[row_num] = ( + TRADE_EVENT | side_event, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 + except (json.JSONDecodeError, ValueError, KeyError, IndexError) as e: + print(f"Error processing line: {e}") + continue + + # Truncate the buffer to the actual number of rows used + tmp = tmp[:row_num] + + print('Correcting the latency') + tmp = correct_local_timestamp(tmp, base_latency) + + print('Correcting the event order') + data = correct_event_order( + tmp, + np.argsort(tmp['exch_ts'], kind='mergesort'), + np.argsort(tmp['local_ts'], kind='mergesort') + ) + + validate_event_order(data) + + if output_filename is not None: + print('Saving to %s' % output_filename) + np.savez_compressed(output_filename, data=data) + + return data \ No newline at end of file