1
1
import gzip
2
2
import json
3
+ import datetime
3
4
from typing import Optional , Literal
4
5
5
6
import numpy as np
@@ -50,6 +51,8 @@ def convert(
50
51
Returns:
51
52
Converted data compatible with HftBacktest.
52
53
"""
54
+ timestamp_mul = 1000000 # Multiplier to convert ms to ns
55
+
53
56
tmp = np .empty (buffer_size , event_dtype )
54
57
row_num = 0
55
58
with gzip .open (input_filename , 'r' ) as f :
@@ -58,165 +61,176 @@ def convert(
58
61
if not line :
59
62
break
60
63
61
- # Find the first space which separates timestamp from JSON data
62
- space_index = line .find (b' ' )
63
- if space_index == - 1 :
64
- continue # Skip malformed lines
64
+ try :
65
+ # Find the first space which separates timestamp from JSON data
66
+ space_index = line .find (b' ' )
67
+ if space_index == - 1 :
68
+ continue # Skip malformed lines
65
69
66
- local_timestamp = int (line [:space_index ])
67
- message = json .loads (line [space_index + 1 :])
68
-
69
- # Check if the message has data field
70
- if 'data' not in message :
71
- continue
70
+ local_timestamp = int (line [:space_index ])
71
+ message = json .loads (line [space_index + 1 :])
72
72
73
- data = message ['data' ]
74
- group = message .get ('group' , '' )
75
-
76
- # Process depth data (snapshot or update)
77
- if 'symbol' in data and ('bids' in data or 'asks' in data ):
78
- ms_t = data .get ('ms_t' , 0 ) # BitMart exchange timestamp in milliseconds
79
- exch_timestamp = int (ms_t ) * 1000 # Convert to nanoseconds
73
+ # Check if the message has data field
74
+ if 'data' not in message :
75
+ continue
76
+
77
+ data = message ['data' ]
78
+ group = message .get ('group' , '' )
80
79
81
- # Process bids
82
- if 'bids' in data :
83
- for bid in data ['bids' ]:
84
- price = bid ['price' ]
85
- qty = bid ['vol' ]
80
+ # Process depth data (snapshot or update)
81
+ if 'symbol' in data and ('bids' in data or 'asks' in data ):
82
+ ms_t = data .get ('ms_t' , 0 ) # BitMart exchange timestamp in milliseconds
83
+ exch_timestamp = int (ms_t ) * timestamp_mul # Convert to nanoseconds
84
+
85
+ # For snapshots, add depth clear events before processing
86
+ if data .get ('type' ) == 'snapshot' :
87
+ # We need to add these *before* the snapshot data
88
+ if 'bids' in data and data ['bids' ]:
89
+ bid_prices = [float (bid ['price' ]) for bid in data ['bids' ]]
90
+ # For bids, the clear should be up to the lowest price (max price for comparison)
91
+ bid_clear_upto = min (bid_prices )
92
+
93
+ # Insert the clear event
94
+ tmp [row_num ] = (
95
+ DEPTH_CLEAR_EVENT | BUY_EVENT ,
96
+ exch_timestamp ,
97
+ local_timestamp ,
98
+ bid_clear_upto ,
99
+ 0 ,
100
+ 0 ,
101
+ 0 ,
102
+ 0
103
+ )
104
+ row_num += 1
86
105
87
- # For updates, volume of 0 means to remove the price level
88
- event_type = DEPTH_EVENT
89
- if data .get ('type' ) == 'snapshot' :
90
- event_type = DEPTH_SNAPSHOT_EVENT
106
+ if 'asks' in data and data ['asks' ]:
107
+ ask_prices = [float (ask ['price' ]) for ask in data ['asks' ]]
108
+ # For asks, the clear should be up to the highest price (min price for comparison)
109
+ ask_clear_upto = max (ask_prices )
110
+
111
+ # Insert the clear event
112
+ tmp [row_num ] = (
113
+ DEPTH_CLEAR_EVENT | SELL_EVENT ,
114
+ exch_timestamp ,
115
+ local_timestamp ,
116
+ ask_clear_upto ,
117
+ 0 ,
118
+ 0 ,
119
+ 0 ,
120
+ 0
121
+ )
122
+ row_num += 1
123
+
124
+ # Process bids
125
+ if 'bids' in data :
126
+ for bid in data ['bids' ]:
127
+ price = bid ['price' ]
128
+ qty = bid ['vol' ]
91
129
92
- tmp [ row_num ] = (
93
- event_type | BUY_EVENT ,
94
- exch_timestamp ,
95
- local_timestamp ,
96
- float ( price ),
97
- float ( qty ),
98
- 0 ,
99
- 0 ,
100
- 0
101
- )
102
- row_num += 1
103
-
104
- # Process asks
105
- if 'asks' in data :
106
- for ask in data [ 'asks' ]:
107
- price = ask [ 'price' ]
108
- qty = ask [ 'vol' ]
109
-
110
- # For updates, volume of 0 means to remove the price level
111
- event_type = DEPTH_EVENT
112
- if data . get ( 'type' ) == 'snapshot' :
113
- event_type = DEPTH_SNAPSHOT_EVENT
130
+ # For updates, volume of 0 means to remove the price level
131
+ event_type = DEPTH_EVENT
132
+ if data . get ( 'type' ) == 'snapshot' :
133
+ event_type = DEPTH_SNAPSHOT_EVENT
134
+
135
+ tmp [ row_num ] = (
136
+ event_type | BUY_EVENT ,
137
+ exch_timestamp ,
138
+ local_timestamp ,
139
+ float ( price ),
140
+ float ( qty ),
141
+ 0 ,
142
+ 0 ,
143
+ 0
144
+ )
145
+ row_num += 1
146
+
147
+ # Process asks
148
+ if 'asks' in data :
149
+ for ask in data [ 'asks' ]:
150
+ price = ask [ 'price' ]
151
+ qty = ask [ 'vol' ]
114
152
115
- tmp [row_num ] = (
116
- event_type | SELL_EVENT ,
117
- exch_timestamp ,
118
- local_timestamp ,
119
- float (price ),
120
- float (qty ),
121
- 0 ,
122
- 0 ,
123
- 0
124
- )
125
- row_num += 1
153
+ # For updates, volume of 0 means to remove the price level
154
+ event_type = DEPTH_EVENT
155
+ if data .get ('type' ) == 'snapshot' :
156
+ event_type = DEPTH_SNAPSHOT_EVENT
157
+
158
+ tmp [row_num ] = (
159
+ event_type | SELL_EVENT ,
160
+ exch_timestamp ,
161
+ local_timestamp ,
162
+ float (price ),
163
+ float (qty ),
164
+ 0 ,
165
+ 0 ,
166
+ 0
167
+ )
168
+ row_num += 1
126
169
127
- # For snapshots, add depth clear events before processing
128
- if data .get ('type' ) == 'snapshot' :
129
- # We need to add these *before* the snapshot data, so we'll shift the data
130
- # Find the lowest and highest prices from the snapshot
131
- if 'bids' in data and data ['bids' ]:
132
- bid_prices = [float (bid ['price' ]) for bid in data ['bids' ]]
133
- bid_clear_upto = max (bid_prices )
134
-
135
- # Shift data to make room for clear event
136
- for i in range (row_num - len (data ['bids' ]), row_num ):
137
- tmp [i + 1 ] = tmp [i ]
170
+ # Process trade data
171
+ elif isinstance (data , list ) and 'futures/trade' in group :
172
+ for trade in data :
173
+ if 'deal_price' in trade and 'deal_vol' in trade :
174
+ # Parse timestamp from created_at field
175
+ created_at = trade .get ('created_at' , '' )
176
+ if created_at :
177
+ try :
178
+ # Format: "2025-03-10T18:17:14.656686827Z"
179
+ # Convert to nanoseconds
180
+ dt = datetime .datetime .strptime (created_at .split ('.' )[0 ], "%Y-%m-%dT%H:%M:%S" )
181
+ # Set to UTC
182
+ dt = dt .replace (tzinfo = datetime .timezone .utc )
183
+ nanos_part = created_at .split ('.' )[1 ].rstrip ('Z' )
184
+ nanos = int (nanos_part .ljust (9 , '0' )[:9 ]) # Ensure 9 digits for nanos
185
+
186
+ # Convert to Unix timestamp in nanoseconds
187
+ exch_timestamp = int (dt .timestamp ()) * 1000000000 + nanos
188
+ except (ValueError , IndexError ):
189
+ # Fallback to ms_t if available, otherwise use local_timestamp
190
+ exch_timestamp = int (trade .get ('ms_t' , local_timestamp // 1000 )) * timestamp_mul
191
+ else :
192
+ # Fallback to ms_t if available, otherwise use local_timestamp
193
+ exch_timestamp = int (trade .get ('ms_t' , local_timestamp // 1000 )) * timestamp_mul
138
194
139
- # Insert the clear event
140
- tmp [row_num - len (data ['bids' ])] = (
141
- DEPTH_CLEAR_EVENT | BUY_EVENT ,
142
- exch_timestamp ,
143
- local_timestamp ,
144
- bid_clear_upto ,
145
- 0 ,
146
- 0 ,
147
- 0 ,
148
- 0
149
- )
150
- row_num += 1
151
-
152
- if 'asks' in data and data ['asks' ]:
153
- ask_prices = [float (ask ['price' ]) for ask in data ['asks' ]]
154
- ask_clear_upto = max (ask_prices )
155
-
156
- # Calculate how many bid entries we have
157
- bid_count = len (data .get ('bids' , []))
158
-
159
- # Shift data to make room for clear event
160
- for i in range (row_num - len (data ['asks' ]), row_num ):
161
- tmp [i + 1 ] = tmp [i ]
195
+ price = trade ['deal_price' ]
196
+ qty = trade ['deal_vol' ]
162
197
163
- # Insert the clear event
164
- tmp [row_num - len (data ['asks' ])] = (
165
- DEPTH_CLEAR_EVENT | SELL_EVENT ,
166
- exch_timestamp ,
167
- local_timestamp ,
168
- ask_clear_upto ,
169
- 0 ,
170
- 0 ,
171
- 0 ,
172
- 0
173
- )
174
- row_num += 1
175
-
176
- # Process trade data
177
- elif isinstance (data , list ) and 'futures/trade' in group :
178
- for trade in data :
179
- if 'deal_price' in trade and 'deal_vol' in trade :
180
- # Parse timestamp from created_at field
181
- # The format is "2025-03-10T18:17:14.656686827Z"
182
- created_at = trade .get ('created_at' , '' )
183
- if '.' in created_at :
184
- # Extract nanoseconds part
185
- timestamp_parts = created_at .split ('.' )
186
- if len (timestamp_parts ) > 1 :
187
- nanos_str = timestamp_parts [1 ].rstrip ('Z' )
188
- # Convert to Unix timestamp in nanoseconds (approximate)
189
- # For simplicity, we'll use local_timestamp as it's close enough
190
- exch_timestamp = local_timestamp
191
- else :
192
- exch_timestamp = local_timestamp
193
- else :
194
- exch_timestamp = local_timestamp
195
-
196
- price = trade ['deal_price' ]
197
- qty = trade ['deal_vol' ]
198
-
199
- # Determine trade side
200
- # way=1 for buy, way=2 for sell (m=true means buyer is maker)
201
- is_buyer_maker = trade .get ('m' , False )
202
- way = trade .get ('way' , 0 )
203
-
204
- # In BitMart, 'way' indicates the taker's direction:
205
- # way=1: taker is buyer, way=2: taker is seller
206
- # We need to convert this to BUY_EVENT or SELL_EVENT
207
- side_event = SELL_EVENT if way == 1 else BUY_EVENT
208
-
209
- tmp [row_num ] = (
210
- TRADE_EVENT | side_event ,
211
- exch_timestamp ,
212
- local_timestamp ,
213
- float (price ),
214
- float (qty ),
215
- 0 ,
216
- 0 ,
217
- 0
218
- )
219
- row_num += 1
198
+ # Determine trade side using the 'way' and 'm' fields
199
+ way = trade .get ('way' , 0 )
200
+ is_buyer_maker = trade .get ('m' , False )
201
+
202
+ # BitMart way field meanings:
203
+ # 1 = buy_open_long sell_open_short
204
+ # 2 = buy_open_long sell_close_long
205
+ # 3 = buy_close_short sell_open_short
206
+ # 4 = buy_close_short sell_close_long
207
+ # 5 = sell_open_short buy_open_long
208
+ # 6 = sell_open_short buy_close_short
209
+ # 7 = sell_close_long buy_open_long
210
+ # 8 = sell_close_long buy_close_short
211
+
212
+ # The 'm' field: true is "buyer is maker", false is "seller is maker"
213
+ # For HftBacktest, we need to indicate the initiator's side (the taker)
214
+
215
+ # Determine the taker side based on is_buyer_maker
216
+ # If buyer is maker (m=true), then seller is taker -> SELL_EVENT
217
+ # If seller is maker (m=false), then buyer is taker -> BUY_EVENT
218
+ side_event = SELL_EVENT if is_buyer_maker else BUY_EVENT
219
+
220
+ tmp [row_num ] = (
221
+ TRADE_EVENT | side_event ,
222
+ exch_timestamp ,
223
+ local_timestamp ,
224
+ float (price ),
225
+ float (qty ),
226
+ 0 ,
227
+ 0 ,
228
+ 0
229
+ )
230
+ row_num += 1
231
+ except (json .JSONDecodeError , ValueError , KeyError , IndexError ) as e :
232
+ print (f"Error processing line: { e } " )
233
+ continue
220
234
221
235
# Truncate the buffer to the actual number of rows used
222
236
tmp = tmp [:row_num ]
0 commit comments