From 5efa8d5ce91f09d94376045be166780a97bb765f Mon Sep 17 00:00:00 2001 From: M8 Date: Mon, 10 Mar 2025 21:56:54 +0100 Subject: [PATCH 1/2] added bitmart futures data collection --- collector/src/bitmart/http.rs | 170 +++++++++++++ collector/src/bitmart/mod.rs | 52 ++++ collector/src/main.rs | 12 + .../hftbacktest/data/utils/bitmart.py | 240 ++++++++++++++++++ 4 files changed, 474 insertions(+) create mode 100644 collector/src/bitmart/http.rs create mode 100644 collector/src/bitmart/mod.rs create mode 100644 py-hftbacktest/hftbacktest/data/utils/bitmart.py diff --git a/collector/src/bitmart/http.rs b/collector/src/bitmart/http.rs new file mode 100644 index 0000000..541cde8 --- /dev/null +++ b/collector/src/bitmart/http.rs @@ -0,0 +1,170 @@ +use std::{ + io, + io::ErrorKind, + time::{Duration, Instant}, +}; + +use anyhow::Error; +use chrono::{DateTime, Utc}; +use futures_util::{SinkExt, StreamExt}; +use tokio::{ + select, + sync::mpsc::{UnboundedSender, unbounded_channel}, +}; +use tokio_tungstenite::{ + connect_async, + tungstenite::{Bytes, Message, Utf8Bytes, client::IntoClientRequest}, +}; +use tracing::{error, warn}; + +pub async fn fetch_depth_snapshot(symbol: &str) -> Result { + reqwest::Client::new() + .get(format!( + "https://api-cloud-v2.bitmart.com/contract/public/depth?symbol={symbol}" + )) + .header("Accept", "application/json") + .send() + .await? + .text() + .await +} + +pub async fn connect( + url: &str, + topics: Vec, + ws_tx: UnboundedSender<(DateTime, Utf8Bytes)>, +) -> Result<(), anyhow::Error> { + let request = url.into_client_request()?; + let (ws_stream, _) = connect_async(request).await?; + let (mut write, mut read) = ws_stream.split(); + let (tx, mut rx) = unbounded_channel::<()>(); + + let s = format!( + r#"{{"action":"subscribe","args":[{}]}}"#, + topics + .iter() + .map(|s| format!("\"{s}\"")) + .collect::>() + .join(",") + ); + + write + .send(Message::Text( + format!( + // r#"{{"req_id": "subscribe", "op": "subscribe", "args": [{}]}}"#, + r#"{{"action":"subscribe","args":[{}]}}"#, + topics + .iter() + .map(|s| format!("\"{s}\"")) + .collect::>() + .join(",") + ) + .into(), + )) + .await?; + + tokio::spawn(async move { + let mut ping_interval = tokio::time::interval(Duration::from_secs(30)); + loop { + select! { + result = rx.recv() => { + match result { + Some(_) => { + if write.send(Message::Pong(Bytes::default())).await.is_err() { + return; + } + } + None => { + break; + } + } + } + _ = ping_interval.tick() => { + if write.send( + Message::Text(r#"{"action":"ping"}"#.into()) + ).await.is_err() { + return; + } + } + } + } + }); + + loop { + match read.next().await { + Some(Ok(Message::Text(text))) => { + let recv_time = Utc::now(); + if ws_tx.send((recv_time, text)).is_err() { + break; + } + } + Some(Ok(Message::Binary(_))) => {} + Some(Ok(Message::Ping(_))) => { + tx.send(()).unwrap(); + } + Some(Ok(Message::Pong(_))) => {} + Some(Ok(Message::Close(close_frame))) => { + warn!(?close_frame, "closed"); + return Err(Error::from(io::Error::new( + ErrorKind::ConnectionAborted, + "closed", + ))); + } + Some(Ok(Message::Frame(_))) => {} + Some(Err(e)) => { + return Err(Error::from(e)); + } + None => { + break; + } + } + } + Ok(()) +} + +pub async fn keep_connection( + topics: Vec, + symbol_list: Vec, + ws_tx: UnboundedSender<(DateTime, Utf8Bytes)>, +) { + let mut error_count = 0; + loop { + let connect_time = Instant::now(); + let topics_ = symbol_list + .iter() + .flat_map(|pair| { + topics + .iter() + .cloned() + .map(|stream| { + stream + .replace("$symbol", pair.to_uppercase().as_str()) + .to_string() + }) + .collect::>() + }) + .collect::>(); + if let Err(error) = connect( + "wss://openapi-ws-v2.bitmart.com/api?protocol=1.1", + topics_, + ws_tx.clone(), + ) + .await + { + error!(?error, "websocket error"); + error_count += 1; + if connect_time.elapsed() > Duration::from_secs(30) { + error_count = 0; + } + if error_count > 3 { + tokio::time::sleep(Duration::from_secs(1)).await; + } else if error_count > 10 { + tokio::time::sleep(Duration::from_secs(5)).await; + } else if error_count > 20 { + tokio::time::sleep(Duration::from_secs(10)).await; + } + } else { + break; + } + } +} diff --git a/collector/src/bitmart/mod.rs b/collector/src/bitmart/mod.rs new file mode 100644 index 0000000..a4f91e9 --- /dev/null +++ b/collector/src/bitmart/mod.rs @@ -0,0 +1,52 @@ +mod http; + +use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; +use chrono::{DateTime, Utc}; +pub use http::{fetch_depth_snapshot, keep_connection}; +use tracing::{error, warn}; +use tokio_tungstenite::tungstenite::Utf8Bytes; + +use crate::{error::ConnectorError, throttler::Throttler}; +use std::collections::HashMap; + +fn handle( + writer_tx: &UnboundedSender<(DateTime, String, String)>, + recv_time: DateTime, + data: Utf8Bytes +) -> Result<(), ConnectorError> { + let j: serde_json::Value = serde_json::from_str(data.as_str())?; + let group = j.get("group").ok_or(ConnectorError::FormatError)?.as_str().ok_or(ConnectorError::FormatError)?; + // If the group string starts with "futures/trade" + if group.starts_with("futures/trade") { + let symbol = group.split("/trade:").last().ok_or(ConnectorError::FormatError)?; + let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string())); + } else if group.starts_with("futures/depthIncrease50") { + if let Some(j_data) = j.get("data") { + if let Some(j_symbol) = j_data + .as_object() + .ok_or(ConnectorError::FormatError)? + .get("symbol") + { + let symbol = j_symbol.as_str().ok_or(ConnectorError::FormatError)?; + let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string())); + } + } + } + Ok(()) +} + +pub async fn run_collection( + topics: Vec, + symbols: Vec, + writer_tx: UnboundedSender<(DateTime, String, String)>, +) -> Result<(), anyhow::Error> { + let (ws_tx, mut ws_rx) = unbounded_channel(); + let h = tokio::spawn(keep_connection(topics, symbols, ws_tx.clone())); + while let Some((recv_time, data)) = ws_rx.recv().await { + if let Err(error) = handle(&writer_tx, recv_time, data) { + error!(?error, "couldn't handle the received data."); + } + } + let _ = h.await; + Ok(()) +} \ No newline at end of file diff --git a/collector/src/main.rs b/collector/src/main.rs index f93f1ed..ba0b866 100644 --- a/collector/src/main.rs +++ b/collector/src/main.rs @@ -12,6 +12,7 @@ mod bybit; mod error; mod file; mod throttler; +mod bitmart; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -90,6 +91,17 @@ async fn main() -> Result<(), anyhow::Error> { tokio::spawn(bybit::run_collection(topics, args.symbols, writer_tx)) } + "bitmart" => { + let topics = [ + "futures/depthIncrease50:$symbol@100ms", + "futures/trade:$symbol", + ] + .iter() + .map(|topic| topic.to_string()) + .collect(); + + tokio::spawn(bitmart::run_collection(topics, args.symbols, writer_tx)) + } exchange => { return Err(anyhow!("{exchange} is not supported.")); } diff --git a/py-hftbacktest/hftbacktest/data/utils/bitmart.py b/py-hftbacktest/hftbacktest/data/utils/bitmart.py new file mode 100644 index 0000000..2e7940d --- /dev/null +++ b/py-hftbacktest/hftbacktest/data/utils/bitmart.py @@ -0,0 +1,240 @@ +import gzip +import json +from typing import Optional, Literal + +import numpy as np +from numpy.typing import NDArray + +from ..validation import correct_event_order, correct_local_timestamp, validate_event_order +from ...types import ( + DEPTH_EVENT, + DEPTH_CLEAR_EVENT, + DEPTH_SNAPSHOT_EVENT, + TRADE_EVENT, + BUY_EVENT, + SELL_EVENT, + event_dtype +) + + +def convert( + input_filename: str, + output_filename: Optional[str] = None, + base_latency: float = 0, + buffer_size: int = 100_000_000 +) -> NDArray: + r""" + Converts raw BitMart feed stream file into a format compatible with HftBacktest. + If you encounter an ``IndexError`` due to an out-of-bounds, try increasing the ``buffer_size``. + + **File Format:** + + .. code-block:: + + # Depth Snapshot + 1741630633009878000 {"data":{"symbol":"BTCUSDT","asks":[{"price":"78723.1","vol":"468"},...],"bids":[{"price":"78709","vol":"3437"},...],"ms_t":1741630632927,"version":7822754,"type":"snapshot"},"group":"futures/depthIncrease50:BTCUSDT@100ms"} + + # Depth Update + 1741630633063800000 {"data":{"symbol":"BTCUSDT","asks":[{"price":"78720.1","vol":"766"},...],"bids":[{"price":"78719.8","vol":"433"},...],"ms_t":1741630632990,"version":7822755,"type":"update"},"group":"futures/depthIncrease50:BTCUSDT@100ms"} + + # Trade + 1741630634788924000 {"data":[{"trade_id":3000000389425121,"symbol":"BTCUSDT","deal_price":"78727.3","deal_vol":"310","way":2,"m":true,"created_at":"2025-03-10T18:17:14.656686827Z"}],"group":"futures/trade:BTCUSDT"} + + Args: + input_filename: Input filename with path. + output_filename: If provided, the converted data will be saved to the specified filename in ``npz`` format. + base_latency: The value to be added to the feed latency. + See :func:`.correct_local_timestamp`. + buffer_size: Sets a preallocated row size for the buffer. + + Returns: + Converted data compatible with HftBacktest. + """ + tmp = np.empty(buffer_size, event_dtype) + row_num = 0 + with gzip.open(input_filename, 'r') as f: + while True: + line = f.readline() + if not line: + break + + # Find the first space which separates timestamp from JSON data + space_index = line.find(b' ') + if space_index == -1: + continue # Skip malformed lines + + local_timestamp = int(line[:space_index]) + message = json.loads(line[space_index + 1:]) + + # Check if the message has data field + if 'data' not in message: + continue + + data = message['data'] + group = message.get('group', '') + + # Process depth data (snapshot or update) + if 'symbol' in data and ('bids' in data or 'asks' in data): + ms_t = data.get('ms_t', 0) # BitMart exchange timestamp in milliseconds + exch_timestamp = int(ms_t) * 1000 # Convert to nanoseconds + + # Process bids + if 'bids' in data: + for bid in data['bids']: + price = bid['price'] + qty = bid['vol'] + + # For updates, volume of 0 means to remove the price level + event_type = DEPTH_EVENT + if data.get('type') == 'snapshot': + event_type = DEPTH_SNAPSHOT_EVENT + + tmp[row_num] = ( + event_type | BUY_EVENT, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 + + # Process asks + if 'asks' in data: + for ask in data['asks']: + price = ask['price'] + qty = ask['vol'] + + # For updates, volume of 0 means to remove the price level + event_type = DEPTH_EVENT + if data.get('type') == 'snapshot': + event_type = DEPTH_SNAPSHOT_EVENT + + tmp[row_num] = ( + event_type | SELL_EVENT, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 + + # For snapshots, add depth clear events before processing + if data.get('type') == 'snapshot': + # We need to add these *before* the snapshot data, so we'll shift the data + # Find the lowest and highest prices from the snapshot + if 'bids' in data and data['bids']: + bid_prices = [float(bid['price']) for bid in data['bids']] + bid_clear_upto = max(bid_prices) + + # Shift data to make room for clear event + for i in range(row_num - len(data['bids']), row_num): + tmp[i + 1] = tmp[i] + + # Insert the clear event + tmp[row_num - len(data['bids'])] = ( + DEPTH_CLEAR_EVENT | BUY_EVENT, + exch_timestamp, + local_timestamp, + bid_clear_upto, + 0, + 0, + 0, + 0 + ) + row_num += 1 + + if 'asks' in data and data['asks']: + ask_prices = [float(ask['price']) for ask in data['asks']] + ask_clear_upto = max(ask_prices) + + # Calculate how many bid entries we have + bid_count = len(data.get('bids', [])) + + # Shift data to make room for clear event + for i in range(row_num - len(data['asks']), row_num): + tmp[i + 1] = tmp[i] + + # Insert the clear event + tmp[row_num - len(data['asks'])] = ( + DEPTH_CLEAR_EVENT | SELL_EVENT, + exch_timestamp, + local_timestamp, + ask_clear_upto, + 0, + 0, + 0, + 0 + ) + row_num += 1 + + # Process trade data + elif isinstance(data, list) and 'futures/trade' in group: + for trade in data: + if 'deal_price' in trade and 'deal_vol' in trade: + # Parse timestamp from created_at field + # The format is "2025-03-10T18:17:14.656686827Z" + created_at = trade.get('created_at', '') + if '.' in created_at: + # Extract nanoseconds part + timestamp_parts = created_at.split('.') + if len(timestamp_parts) > 1: + nanos_str = timestamp_parts[1].rstrip('Z') + # Convert to Unix timestamp in nanoseconds (approximate) + # For simplicity, we'll use local_timestamp as it's close enough + exch_timestamp = local_timestamp + else: + exch_timestamp = local_timestamp + else: + exch_timestamp = local_timestamp + + price = trade['deal_price'] + qty = trade['deal_vol'] + + # Determine trade side + # way=1 for buy, way=2 for sell (m=true means buyer is maker) + is_buyer_maker = trade.get('m', False) + way = trade.get('way', 0) + + # In BitMart, 'way' indicates the taker's direction: + # way=1: taker is buyer, way=2: taker is seller + # We need to convert this to BUY_EVENT or SELL_EVENT + side_event = SELL_EVENT if way == 1 else BUY_EVENT + + tmp[row_num] = ( + TRADE_EVENT | side_event, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 + + # Truncate the buffer to the actual number of rows used + tmp = tmp[:row_num] + + print('Correcting the latency') + tmp = correct_local_timestamp(tmp, base_latency) + + print('Correcting the event order') + data = correct_event_order( + tmp, + np.argsort(tmp['exch_ts'], kind='mergesort'), + np.argsort(tmp['local_ts'], kind='mergesort') + ) + + validate_event_order(data) + + if output_filename is not None: + print('Saving to %s' % output_filename) + np.savez_compressed(output_filename, data=data) + + return data \ No newline at end of file From 7993e200cf06fdf03c634083be96384ee8e5a294 Mon Sep 17 00:00:00 2001 From: M8 Date: Tue, 11 Mar 2025 08:30:30 +0100 Subject: [PATCH 2/2] starting with a DEPTH_CLEAR_EVENT and timestamp conversion --- .../hftbacktest/data/utils/bitmart.py | 314 +++++++++--------- 1 file changed, 164 insertions(+), 150 deletions(-) diff --git a/py-hftbacktest/hftbacktest/data/utils/bitmart.py b/py-hftbacktest/hftbacktest/data/utils/bitmart.py index 2e7940d..e70886d 100644 --- a/py-hftbacktest/hftbacktest/data/utils/bitmart.py +++ b/py-hftbacktest/hftbacktest/data/utils/bitmart.py @@ -1,5 +1,6 @@ import gzip import json +import datetime from typing import Optional, Literal import numpy as np @@ -50,6 +51,8 @@ def convert( Returns: Converted data compatible with HftBacktest. """ + timestamp_mul = 1000000 # Multiplier to convert ms to ns + tmp = np.empty(buffer_size, event_dtype) row_num = 0 with gzip.open(input_filename, 'r') as f: @@ -58,165 +61,176 @@ def convert( if not line: break - # Find the first space which separates timestamp from JSON data - space_index = line.find(b' ') - if space_index == -1: - continue # Skip malformed lines + try: + # Find the first space which separates timestamp from JSON data + space_index = line.find(b' ') + if space_index == -1: + continue # Skip malformed lines - local_timestamp = int(line[:space_index]) - message = json.loads(line[space_index + 1:]) - - # Check if the message has data field - if 'data' not in message: - continue + local_timestamp = int(line[:space_index]) + message = json.loads(line[space_index + 1:]) - data = message['data'] - group = message.get('group', '') - - # Process depth data (snapshot or update) - if 'symbol' in data and ('bids' in data or 'asks' in data): - ms_t = data.get('ms_t', 0) # BitMart exchange timestamp in milliseconds - exch_timestamp = int(ms_t) * 1000 # Convert to nanoseconds + # Check if the message has data field + if 'data' not in message: + continue + + data = message['data'] + group = message.get('group', '') - # Process bids - if 'bids' in data: - for bid in data['bids']: - price = bid['price'] - qty = bid['vol'] + # Process depth data (snapshot or update) + if 'symbol' in data and ('bids' in data or 'asks' in data): + ms_t = data.get('ms_t', 0) # BitMart exchange timestamp in milliseconds + exch_timestamp = int(ms_t) * timestamp_mul # Convert to nanoseconds + + # For snapshots, add depth clear events before processing + if data.get('type') == 'snapshot': + # We need to add these *before* the snapshot data + if 'bids' in data and data['bids']: + bid_prices = [float(bid['price']) for bid in data['bids']] + # For bids, the clear should be up to the lowest price (max price for comparison) + bid_clear_upto = min(bid_prices) + + # Insert the clear event + tmp[row_num] = ( + DEPTH_CLEAR_EVENT | BUY_EVENT, + exch_timestamp, + local_timestamp, + bid_clear_upto, + 0, + 0, + 0, + 0 + ) + row_num += 1 - # For updates, volume of 0 means to remove the price level - event_type = DEPTH_EVENT - if data.get('type') == 'snapshot': - event_type = DEPTH_SNAPSHOT_EVENT + if 'asks' in data and data['asks']: + ask_prices = [float(ask['price']) for ask in data['asks']] + # For asks, the clear should be up to the highest price (min price for comparison) + ask_clear_upto = max(ask_prices) + + # Insert the clear event + tmp[row_num] = ( + DEPTH_CLEAR_EVENT | SELL_EVENT, + exch_timestamp, + local_timestamp, + ask_clear_upto, + 0, + 0, + 0, + 0 + ) + row_num += 1 + + # Process bids + if 'bids' in data: + for bid in data['bids']: + price = bid['price'] + qty = bid['vol'] - tmp[row_num] = ( - event_type | BUY_EVENT, - exch_timestamp, - local_timestamp, - float(price), - float(qty), - 0, - 0, - 0 - ) - row_num += 1 - - # Process asks - if 'asks' in data: - for ask in data['asks']: - price = ask['price'] - qty = ask['vol'] - - # For updates, volume of 0 means to remove the price level - event_type = DEPTH_EVENT - if data.get('type') == 'snapshot': - event_type = DEPTH_SNAPSHOT_EVENT + # For updates, volume of 0 means to remove the price level + event_type = DEPTH_EVENT + if data.get('type') == 'snapshot': + event_type = DEPTH_SNAPSHOT_EVENT + + tmp[row_num] = ( + event_type | BUY_EVENT, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 + + # Process asks + if 'asks' in data: + for ask in data['asks']: + price = ask['price'] + qty = ask['vol'] - tmp[row_num] = ( - event_type | SELL_EVENT, - exch_timestamp, - local_timestamp, - float(price), - float(qty), - 0, - 0, - 0 - ) - row_num += 1 + # For updates, volume of 0 means to remove the price level + event_type = DEPTH_EVENT + if data.get('type') == 'snapshot': + event_type = DEPTH_SNAPSHOT_EVENT + + tmp[row_num] = ( + event_type | SELL_EVENT, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 - # For snapshots, add depth clear events before processing - if data.get('type') == 'snapshot': - # We need to add these *before* the snapshot data, so we'll shift the data - # Find the lowest and highest prices from the snapshot - if 'bids' in data and data['bids']: - bid_prices = [float(bid['price']) for bid in data['bids']] - bid_clear_upto = max(bid_prices) - - # Shift data to make room for clear event - for i in range(row_num - len(data['bids']), row_num): - tmp[i + 1] = tmp[i] + # Process trade data + elif isinstance(data, list) and 'futures/trade' in group: + for trade in data: + if 'deal_price' in trade and 'deal_vol' in trade: + # Parse timestamp from created_at field + created_at = trade.get('created_at', '') + if created_at: + try: + # Format: "2025-03-10T18:17:14.656686827Z" + # Convert to nanoseconds + dt = datetime.datetime.strptime(created_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") + # Set to UTC + dt = dt.replace(tzinfo=datetime.timezone.utc) + nanos_part = created_at.split('.')[1].rstrip('Z') + nanos = int(nanos_part.ljust(9, '0')[:9]) # Ensure 9 digits for nanos + + # Convert to Unix timestamp in nanoseconds + exch_timestamp = int(dt.timestamp()) * 1000000000 + nanos + except (ValueError, IndexError): + # Fallback to ms_t if available, otherwise use local_timestamp + exch_timestamp = int(trade.get('ms_t', local_timestamp // 1000)) * timestamp_mul + else: + # Fallback to ms_t if available, otherwise use local_timestamp + exch_timestamp = int(trade.get('ms_t', local_timestamp // 1000)) * timestamp_mul - # Insert the clear event - tmp[row_num - len(data['bids'])] = ( - DEPTH_CLEAR_EVENT | BUY_EVENT, - exch_timestamp, - local_timestamp, - bid_clear_upto, - 0, - 0, - 0, - 0 - ) - row_num += 1 - - if 'asks' in data and data['asks']: - ask_prices = [float(ask['price']) for ask in data['asks']] - ask_clear_upto = max(ask_prices) - - # Calculate how many bid entries we have - bid_count = len(data.get('bids', [])) - - # Shift data to make room for clear event - for i in range(row_num - len(data['asks']), row_num): - tmp[i + 1] = tmp[i] + price = trade['deal_price'] + qty = trade['deal_vol'] - # Insert the clear event - tmp[row_num - len(data['asks'])] = ( - DEPTH_CLEAR_EVENT | SELL_EVENT, - exch_timestamp, - local_timestamp, - ask_clear_upto, - 0, - 0, - 0, - 0 - ) - row_num += 1 - - # Process trade data - elif isinstance(data, list) and 'futures/trade' in group: - for trade in data: - if 'deal_price' in trade and 'deal_vol' in trade: - # Parse timestamp from created_at field - # The format is "2025-03-10T18:17:14.656686827Z" - created_at = trade.get('created_at', '') - if '.' in created_at: - # Extract nanoseconds part - timestamp_parts = created_at.split('.') - if len(timestamp_parts) > 1: - nanos_str = timestamp_parts[1].rstrip('Z') - # Convert to Unix timestamp in nanoseconds (approximate) - # For simplicity, we'll use local_timestamp as it's close enough - exch_timestamp = local_timestamp - else: - exch_timestamp = local_timestamp - else: - exch_timestamp = local_timestamp - - price = trade['deal_price'] - qty = trade['deal_vol'] - - # Determine trade side - # way=1 for buy, way=2 for sell (m=true means buyer is maker) - is_buyer_maker = trade.get('m', False) - way = trade.get('way', 0) - - # In BitMart, 'way' indicates the taker's direction: - # way=1: taker is buyer, way=2: taker is seller - # We need to convert this to BUY_EVENT or SELL_EVENT - side_event = SELL_EVENT if way == 1 else BUY_EVENT - - tmp[row_num] = ( - TRADE_EVENT | side_event, - exch_timestamp, - local_timestamp, - float(price), - float(qty), - 0, - 0, - 0 - ) - row_num += 1 + # Determine trade side using the 'way' and 'm' fields + way = trade.get('way', 0) + is_buyer_maker = trade.get('m', False) + + # BitMart way field meanings: + # 1 = buy_open_long sell_open_short + # 2 = buy_open_long sell_close_long + # 3 = buy_close_short sell_open_short + # 4 = buy_close_short sell_close_long + # 5 = sell_open_short buy_open_long + # 6 = sell_open_short buy_close_short + # 7 = sell_close_long buy_open_long + # 8 = sell_close_long buy_close_short + + # The 'm' field: true is "buyer is maker", false is "seller is maker" + # For HftBacktest, we need to indicate the initiator's side (the taker) + + # Determine the taker side based on is_buyer_maker + # If buyer is maker (m=true), then seller is taker -> SELL_EVENT + # If seller is maker (m=false), then buyer is taker -> BUY_EVENT + side_event = SELL_EVENT if is_buyer_maker else BUY_EVENT + + tmp[row_num] = ( + TRADE_EVENT | side_event, + exch_timestamp, + local_timestamp, + float(price), + float(qty), + 0, + 0, + 0 + ) + row_num += 1 + except (json.JSONDecodeError, ValueError, KeyError, IndexError) as e: + print(f"Error processing line: {e}") + continue # Truncate the buffer to the actual number of rows used tmp = tmp[:row_num]