Skip to content

Commit 88a9a62

Browse files
add readme and correctness test
1 parent 39f4728 commit 88a9a62

3 files changed

Lines changed: 131 additions & 1623 deletions

File tree

README.md

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
1-
# simdcsv
1+
# arrow-csv2
22

3-
simdcsv is a CSV parser that evaluates 64 bytes at a time. There are many kinds of CSV files; this project adheres to the format described
4-
in [RFC 4180](https://www.rfc-editor.org/rfc/rfc4180.html).
3+
Vectorized CSV parsing for Apache Arrow.
54

6-
## Reading
5+
This project aims to be a faster drop-in replacement for `arrow-csv`, the csv-to-arrow decoder in the `arrow-rs` ecosystem. The parser employs techniques highlighted in the `simdjson` paper like vectorized classification and prefix xor.
76

7+
A secondary goal is to demonstrate a performant parallel object store reader that uses speculative quote-state reconciliation to enable byte-range splitting for files with quoted newlines, something Datafusion currently disables.
8+
9+
# Status
10+
11+
Currently, `arrow-csv2` decodes **2.4x faster** than `arrow-csv` (69ms vs 168ms). This is measured on a 100MB slice (~130K rows) of the [ClickBench](https://github.com/ClickHouse/ClickBench) `hits.csv` dataset.
12+
13+
```sh
14+
# run benchmarks
15+
./download_clickbench.sh
16+
cargo r --bin slice_clickbench
17+
cargo bench
18+
```
19+
20+
The goal is not full feature parity with `arrow-csv`, but a proof of concept that explores how far we can push CSV-to-Arrow performance, from single threaded decoding to parallel ingestion from object store.
21+
22+
# Reading
23+
24+
https://branchfree.org/2019/03/06/code-fragment-finding-quote-pairs-with-carry-less-multiply-pclmulqdq/<br>
825
https://www.rfc-editor.org/rfc/rfc4180.html<br>
926
https://arxiv.org/pdf/1902.08318<br>
10-
https://branchfree.org/2019/03/06/code-fragment-finding-quote-pairs-with-carry-less-multiply-pclmulqdq/<br>

tests/correctness.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use std::sync::Arc;
2+
3+
use arrow_csv2::ReaderBuilder;
4+
use arrow_schema::{DataType, Field, Schema};
5+
6+
fn taxi_zone_schema() -> Arc<Schema> {
7+
Arc::new(Schema::new(vec![
8+
Field::new("LocationID", DataType::Utf8, true),
9+
Field::new("Borough", DataType::Utf8, true),
10+
Field::new("Zone", DataType::Utf8, true),
11+
Field::new("service_zone", DataType::Utf8, true),
12+
]))
13+
}
14+
15+
fn decode_all(
16+
raw: &[u8],
17+
schema: Arc<Schema>,
18+
build: impl Fn(Arc<Schema>) -> arrow_csv2::Decoder,
19+
) -> Vec<arrow_array::RecordBatch> {
20+
let mut decoder = build(schema);
21+
let mut offset = 0;
22+
let mut batches = Vec::new();
23+
loop {
24+
let consumed = decoder.decode(&raw[offset..]).unwrap();
25+
offset += consumed;
26+
if consumed == 0 || decoder.capacity() == 0 {
27+
if let Some(batch) = decoder.flush().unwrap() {
28+
batches.push(batch);
29+
}
30+
if consumed == 0 && decoder.capacity() > 0 {
31+
break;
32+
}
33+
}
34+
}
35+
batches
36+
}
37+
38+
fn decode_all_arrow_csv(
39+
raw: &[u8],
40+
schema: Arc<Schema>,
41+
) -> Vec<arrow_array::RecordBatch> {
42+
let mut decoder = arrow_csv::ReaderBuilder::new(schema)
43+
.with_header(true)
44+
.with_batch_size(8192)
45+
.build_decoder();
46+
47+
let mut offset = 0;
48+
let mut batches = Vec::new();
49+
loop {
50+
let consumed = decoder.decode(&raw[offset..]).unwrap();
51+
offset += consumed;
52+
if consumed == 0 || decoder.capacity() == 0 {
53+
if let Some(batch) = decoder.flush().unwrap() {
54+
batches.push(batch);
55+
}
56+
if consumed == 0 {
57+
break;
58+
}
59+
}
60+
}
61+
batches
62+
}
63+
64+
#[test]
65+
fn arrow_csv2_matches_arrow_csv() {
66+
let raw = std::fs::read("taxi_zone_lookup.csv").expect("missing csv");
67+
let schema = taxi_zone_schema();
68+
69+
let ours = decode_all(&raw, schema.clone(), |s| {
70+
ReaderBuilder::new(s)
71+
.with_header(true)
72+
.with_batch_size(8192)
73+
.build_decoder()
74+
});
75+
76+
let theirs = decode_all_arrow_csv(&raw, schema);
77+
78+
// same number of batches
79+
assert_eq!(ours.len(), theirs.len(), "batch count mismatch");
80+
81+
for (i, (a, b)) in ours.iter().zip(&theirs).enumerate() {
82+
assert_eq!(a.num_rows(), b.num_rows(), "row count mismatch in batch {i}");
83+
assert_eq!(
84+
a.num_columns(),
85+
b.num_columns(),
86+
"column count mismatch in batch {i}"
87+
);
88+
89+
for col in 0..a.num_columns() {
90+
let col_a = a
91+
.column(col)
92+
.as_any()
93+
.downcast_ref::<arrow_array::StringArray>()
94+
.unwrap();
95+
let col_b = b
96+
.column(col)
97+
.as_any()
98+
.downcast_ref::<arrow_array::StringArray>()
99+
.unwrap();
100+
101+
for row in 0..a.num_rows() {
102+
assert_eq!(
103+
col_a.value(row),
104+
col_b.value(row),
105+
"mismatch at batch {i}, col {col}, row {row}"
106+
);
107+
}
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)