Skip to content

Commit 42dbcc7

Browse files
committed
feat: implement reader for parquet type files
1 parent 05244ea commit 42dbcc7

File tree

4 files changed

+104
-1
lines changed

4 files changed

+104
-1
lines changed

hftbacktest/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ rust-version = "1.82"
2222

2323
[features]
2424
default = ["backtest", "live"]
25-
backtest = ["zip", "uuid", "nom", "hftbacktest-derive"]
25+
backtest = ["zip", "uuid", "nom", "hftbacktest-derive", "rayon", "parquet", "arrow-array"]
2626
live = ["chrono", "tokio", "futures-util", "iceoryx2", "rand"]
2727
unstable_fuse = []
2828

@@ -42,6 +42,10 @@ nom = { version = "7.1.3", optional = true }
4242
iceoryx2 = { version = "0.4.1", optional = true, features = ["logger_tracing"] }
4343
hftbacktest-derive = { path = "../hftbacktest-derive", optional = true, version = "0.2.0" }
4444

45+
parquet = { version = "53.3.0", optional = true }
46+
arrow-array = { version = "53.3.0", optional = true }
47+
rayon = { version = "1.10.0", optional = true }
48+
4549
[dev-dependencies]
4650
tracing-subscriber = { version = "0.3.18", features = [] }
4751
clap = { version = "4.5.4", features = ["derive"] }

hftbacktest/src/backtest/data/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod npy;
2+
mod parquet;
23
mod reader;
34

45
use std::{
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use std::fs::File;
2+
use std::sync::{Arc, Mutex};
3+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
4+
use parquet::file::reader::{FileReader, SerializedFileReader};
5+
use tracing::{debug, info};
6+
use crate::backtest::data::{Data, DataPtr, POD};
7+
use crate::prelude::Event;
8+
9+
use super::NpyDTyped;
10+
use arrow_array::{UInt64Array, Int64Array, Float64Array};
11+
use rayon::prelude::*;
12+
13+
14+
pub fn read_parquet_file<D: NpyDTyped + Clone>(filepath: &str) -> std::io::Result<Data<D>> {
15+
let batch_size = 1024 * 1024;
16+
let events_capacity = 150_000_000;
17+
18+
let file = File::open(filepath)?;
19+
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
20+
.unwrap()
21+
.with_batch_size(batch_size);
22+
let reader = builder.build().unwrap();
23+
let events = Arc::new(Mutex::new(Vec::with_capacity(events_capacity)));
24+
25+
// If we use parallel loading here, we need to re-sort by exch_ts in order.
26+
// This is because exch_ts and local_ts are sorted in chronological order.
27+
reader.into_iter().par_bridge().for_each(|maybe_batch| {
28+
let batch = maybe_batch.unwrap();
29+
30+
let ev_col = batch.column(0).as_any().downcast_ref::<UInt64Array>().unwrap();
31+
let exch_ts_col = batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
32+
let local_ts_col = batch.column(2).as_any().downcast_ref::<Int64Array>().unwrap();
33+
let px_col = batch.column(3).as_any().downcast_ref::<Float64Array>().unwrap();
34+
let qty_col = batch.column(4).as_any().downcast_ref::<Float64Array>().unwrap();
35+
let order_id_col = batch.column(5).as_any().downcast_ref::<UInt64Array>().unwrap();
36+
let ival_col = batch.column(6).as_any().downcast_ref::<Int64Array>().unwrap();
37+
let fval_col = batch.column(7).as_any().downcast_ref::<Float64Array>().unwrap();
38+
39+
let mut local_events: Vec<Event> = Vec::with_capacity(batch.num_rows());
40+
for row in 0..batch.num_rows() {
41+
local_events.push(Event {
42+
ev: ev_col.value(row),
43+
exch_ts: exch_ts_col.value(row),
44+
local_ts: local_ts_col.value(row),
45+
px: px_col.value(row),
46+
qty: qty_col.value(row),
47+
order_id: order_id_col.value(row),
48+
ival: ival_col.value(row),
49+
fval: fval_col.value(row),
50+
});
51+
}
52+
debug!("Read {} events", local_events.len());
53+
let mut events = events.lock().unwrap();
54+
events.extend(local_events);
55+
});
56+
57+
let mut events = events.lock().unwrap();
58+
events.par_sort_by_key(|event| event.exch_ts);
59+
let data_ptr = DataPtr::new(events.len() * std::mem::size_of::<D>());
60+
61+
// Copy events to DataPtr
62+
unsafe {
63+
std::ptr::copy_nonoverlapping(
64+
events.as_ptr() as *const u8,
65+
data_ptr.ptr as *mut u8,
66+
events.len() * std::mem::size_of::<D>()
67+
);
68+
}
69+
70+
let data = unsafe { Data::from_data_ptr(data_ptr, 0) };
71+
Ok(data)
72+
}

hftbacktest/src/backtest/data/reader.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use crate::{
2424
types::Event,
2525
};
2626

27+
use super::parquet;
28+
2729
/// Data source for the [`Reader`].
2830
#[derive(Clone, Debug)]
2931
pub enum DataSource<D>
@@ -432,6 +434,30 @@ where
432434
}
433435
}
434436
});
437+
} else if key.ends_with(".parquet") {
438+
let tx = self.tx.clone();
439+
let filepath = key.to_string();
440+
let preprocessor = self.preprocessor.clone();
441+
442+
let _ = thread::spawn(move || {
443+
let load_data = |filepath: &str| {
444+
let mut data = parquet::read_parquet_file::<D>(filepath)?;
445+
if let Some(preprocessor) = &preprocessor {
446+
preprocessor.preprocess(&mut data)?;
447+
}
448+
Ok(data)
449+
};
450+
// SendError occurs only if Reader is already destroyed. Since no data is needed
451+
// once the Reader is destroyed, SendError is safely suppressed.
452+
match load_data(&filepath) {
453+
Ok(data) => {
454+
let _ = tx.send(LoadDataResult::ok(filepath, data));
455+
}
456+
Err(err) => {
457+
let _ = tx.send(LoadDataResult::err(filepath, err));
458+
}
459+
}
460+
});
435461
} else {
436462
return Err(BacktestError::DataError(IoError::new(
437463
ErrorKind::InvalidData,

0 commit comments

Comments
 (0)