Skip to content

Commit 421307a

Browse files
move arrow parsing to separate file
1 parent 86629b9 commit 421307a

3 files changed

Lines changed: 341 additions & 325 deletions

File tree

src/arrow.rs

Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
use std::sync::Arc;
2+
3+
use arrow_array::ArrayRef;
4+
use arrow_array::builder::{
5+
BinaryBuilder, BinaryViewBuilder, BooleanBuilder, Date32Builder, Date64Builder,
6+
Decimal128Builder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder,
7+
Int64Builder, LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder,
8+
TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
9+
TimestampSecondBuilder, UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder,
10+
};
11+
use arrow_cast::parse::Parser;
12+
use arrow_schema::{ArrowError, DataType, Field, TimeUnit};
13+
14+
pub fn build_column(
15+
field: &Field,
16+
data: &[u8],
17+
offsets: &[usize],
18+
col: usize,
19+
num_rows: usize,
20+
) -> Result<ArrayRef, ArrowError> {
21+
let nullable = field.is_nullable();
22+
23+
match field.data_type() {
24+
DataType::Null => Ok(Arc::new(arrow_array::NullArray::new(num_rows))),
25+
DataType::Boolean => {
26+
let mut b = BooleanBuilder::with_capacity(num_rows);
27+
for row in 0..num_rows {
28+
match &data[offsets[row]..offsets[row + 1]] {
29+
[] if nullable => b.append_null(),
30+
b"true" | b"TRUE" | b"True" | b"1" => b.append_value(true),
31+
b"false" | b"FALSE" | b"False" | b"0" => b.append_value(false),
32+
_ => {
33+
return Err(ArrowError::ParseError(format!(
34+
"cannot parse as Boolean at row {row}, col {col}"
35+
)));
36+
}
37+
}
38+
}
39+
Ok(Arc::new(b.finish()))
40+
}
41+
DataType::Utf8 => build_string_col!(
42+
data,
43+
offsets,
44+
col,
45+
num_rows,
46+
nullable,
47+
StringBuilder::with_capacity(num_rows, data.len())
48+
),
49+
DataType::LargeUtf8 => build_string_col!(
50+
data,
51+
offsets,
52+
col,
53+
num_rows,
54+
nullable,
55+
LargeStringBuilder::with_capacity(num_rows, data.len())
56+
),
57+
DataType::Utf8View => build_string_col!(
58+
data,
59+
offsets,
60+
col,
61+
num_rows,
62+
nullable,
63+
StringViewBuilder::with_capacity(num_rows)
64+
),
65+
DataType::Binary => build_binary_col!(
66+
data,
67+
offsets,
68+
num_rows,
69+
nullable,
70+
BinaryBuilder::with_capacity(num_rows, data.len())
71+
),
72+
DataType::LargeBinary => build_binary_col!(
73+
data,
74+
offsets,
75+
num_rows,
76+
nullable,
77+
LargeBinaryBuilder::with_capacity(num_rows, data.len())
78+
),
79+
DataType::BinaryView => build_binary_col!(
80+
data,
81+
offsets,
82+
num_rows,
83+
nullable,
84+
BinaryViewBuilder::with_capacity(num_rows)
85+
),
86+
DataType::Int8 => {
87+
build_int_col!(data, offsets, col, num_rows, nullable, Int8Builder, i8)
88+
}
89+
DataType::Int16 => {
90+
build_int_col!(data, offsets, col, num_rows, nullable, Int16Builder, i16)
91+
}
92+
DataType::Int32 => {
93+
build_int_col!(data, offsets, col, num_rows, nullable, Int32Builder, i32)
94+
}
95+
DataType::Int64 => {
96+
build_int_col!(data, offsets, col, num_rows, nullable, Int64Builder, i64)
97+
}
98+
DataType::UInt8 => {
99+
build_int_col!(data, offsets, col, num_rows, nullable, UInt8Builder, u8)
100+
}
101+
DataType::UInt16 => {
102+
build_int_col!(data, offsets, col, num_rows, nullable, UInt16Builder, u16)
103+
}
104+
DataType::UInt32 => {
105+
build_int_col!(data, offsets, col, num_rows, nullable, UInt32Builder, u32)
106+
}
107+
DataType::UInt64 => {
108+
build_int_col!(data, offsets, col, num_rows, nullable, UInt64Builder, u64)
109+
}
110+
DataType::Float32 => {
111+
build_float_col!(data, offsets, col, num_rows, nullable, Float32Builder, f32)
112+
}
113+
DataType::Float64 => {
114+
build_float_col!(data, offsets, col, num_rows, nullable, Float64Builder, f64)
115+
}
116+
DataType::Date32 => {
117+
build_parsed_col!(
118+
data,
119+
offsets,
120+
col,
121+
num_rows,
122+
nullable,
123+
Date32Builder,
124+
arrow_array::types::Date32Type
125+
)
126+
}
127+
DataType::Date64 => {
128+
build_parsed_col!(
129+
data,
130+
offsets,
131+
col,
132+
num_rows,
133+
nullable,
134+
Date64Builder,
135+
arrow_array::types::Date64Type
136+
)
137+
}
138+
DataType::Timestamp(unit, _) => match unit {
139+
TimeUnit::Second => {
140+
build_parsed_col!(
141+
data,
142+
offsets,
143+
col,
144+
num_rows,
145+
nullable,
146+
TimestampSecondBuilder,
147+
arrow_array::types::TimestampSecondType
148+
)
149+
}
150+
TimeUnit::Millisecond => {
151+
build_parsed_col!(
152+
data,
153+
offsets,
154+
col,
155+
num_rows,
156+
nullable,
157+
TimestampMillisecondBuilder,
158+
arrow_array::types::TimestampMillisecondType
159+
)
160+
}
161+
TimeUnit::Microsecond => {
162+
build_parsed_col!(
163+
data,
164+
offsets,
165+
col,
166+
num_rows,
167+
nullable,
168+
TimestampMicrosecondBuilder,
169+
arrow_array::types::TimestampMicrosecondType
170+
)
171+
}
172+
TimeUnit::Nanosecond => {
173+
build_parsed_col!(
174+
data,
175+
offsets,
176+
col,
177+
num_rows,
178+
nullable,
179+
TimestampNanosecondBuilder,
180+
arrow_array::types::TimestampNanosecondType
181+
)
182+
}
183+
},
184+
DataType::Decimal128(precision, scale) => {
185+
simdutf8::basic::from_utf8(data)
186+
.map_err(|e| ArrowError::ParseError(format!("invalid utf-8 in col {col}: {e}")))?;
187+
188+
let mut b = Decimal128Builder::with_capacity(num_rows)
189+
.with_data_type(DataType::Decimal128(*precision, *scale));
190+
for row in 0..num_rows {
191+
let raw = &data[offsets[row]..offsets[row + 1]];
192+
if raw.is_empty() && nullable {
193+
b.append_null();
194+
} else {
195+
let s = unsafe { std::str::from_utf8_unchecked(raw) };
196+
let v = arrow_cast::parse::parse_decimal::<arrow_array::types::Decimal128Type>(
197+
s, *precision, *scale,
198+
)?;
199+
b.append_value(v);
200+
}
201+
}
202+
Ok(Arc::new(b.finish()))
203+
}
204+
other => Err(ArrowError::NotYetImplemented(format!(
205+
"data type {other} not yet supported"
206+
))),
207+
}
208+
}
209+
210+
macro_rules! build_string_col {
211+
($data:expr, $offsets:expr, $col:expr, $num_rows:expr, $nullable:expr, $builder:expr) => {{
212+
simdutf8::basic::from_utf8($data)
213+
.map_err(|e| ArrowError::ParseError(format!("invalid utf-8 in col {}: {}", $col, e)))?;
214+
215+
let mut b = $builder;
216+
for row in 0..$num_rows {
217+
let s =
218+
unsafe { std::str::from_utf8_unchecked(&$data[$offsets[row]..$offsets[row + 1]]) };
219+
if s.is_empty() && $nullable {
220+
b.append_null();
221+
} else {
222+
b.append_value(s);
223+
}
224+
}
225+
Ok(Arc::new(b.finish()) as ArrayRef)
226+
}};
227+
}
228+
use build_string_col;
229+
230+
macro_rules! build_parsed_col {
231+
($data:expr, $offsets:expr, $col:expr, $num_rows:expr, $nullable:expr, $builder:ty, $arrow_type:ty) => {{
232+
simdutf8::basic::from_utf8($data)
233+
.map_err(|e| ArrowError::ParseError(format!("invalid utf-8 in col {}: {}", $col, e)))?;
234+
235+
let mut b = <$builder>::with_capacity($num_rows);
236+
for row in 0..$num_rows {
237+
let raw = &$data[$offsets[row]..$offsets[row + 1]];
238+
if raw.is_empty() && $nullable {
239+
b.append_null();
240+
} else {
241+
let s = unsafe { std::str::from_utf8_unchecked(raw) };
242+
let v = <$arrow_type as Parser>::parse(s).ok_or_else(|| {
243+
ArrowError::ParseError(format!(
244+
"cannot parse '{}' at row {}, col {}",
245+
s, row, $col
246+
))
247+
})?;
248+
b.append_value(v);
249+
}
250+
}
251+
Ok(Arc::new(b.finish()) as ArrayRef)
252+
}};
253+
}
254+
use build_parsed_col;
255+
256+
macro_rules! build_binary_col {
257+
($data:expr, $offsets:expr, $num_rows:expr, $nullable:expr, $builder:expr) => {{
258+
let mut b = $builder;
259+
for row in 0..$num_rows {
260+
let raw = &$data[$offsets[row]..$offsets[row + 1]];
261+
if raw.is_empty() && $nullable {
262+
b.append_null();
263+
} else {
264+
b.append_value(raw);
265+
}
266+
}
267+
Ok(Arc::new(b.finish()) as ArrayRef)
268+
}};
269+
}
270+
use build_binary_col;
271+
272+
macro_rules! build_int_col {
273+
($data:expr, $offsets:expr, $col:expr, $num_rows:expr, $nullable:expr, $builder:ty, $native:ty) => {{
274+
let mut b = <$builder>::with_capacity($num_rows);
275+
for row in 0..$num_rows {
276+
let raw = &$data[$offsets[row]..$offsets[row + 1]];
277+
if raw.is_empty() && $nullable {
278+
b.append_null();
279+
} else {
280+
let v: $native = atoi::atoi(raw).ok_or_else(|| {
281+
ArrowError::ParseError(format!(
282+
"cannot parse as {} at row {}, col {}",
283+
stringify!($native),
284+
row,
285+
$col
286+
))
287+
})?;
288+
b.append_value(v);
289+
}
290+
}
291+
Ok(Arc::new(b.finish()) as ArrayRef)
292+
}};
293+
}
294+
use build_int_col;
295+
296+
macro_rules! build_float_col {
297+
($data:expr, $offsets:expr, $col:expr, $num_rows:expr, $nullable:expr, $builder:ty, $native:ty) => {{
298+
let mut b = <$builder>::with_capacity($num_rows);
299+
for row in 0..$num_rows {
300+
let raw = &$data[$offsets[row]..$offsets[row + 1]];
301+
if raw.is_empty() && $nullable {
302+
b.append_null();
303+
} else {
304+
let s = simdutf8::basic::from_utf8(raw).map_err(|e| {
305+
ArrowError::ParseError(format!(
306+
"invalid utf-8 at row {}, col {}: {}",
307+
row, $col, e
308+
))
309+
})?;
310+
let v: $native = s.parse().map_err(|_| {
311+
ArrowError::ParseError(format!(
312+
"cannot parse '{}' as {} at row {}, col {}",
313+
s,
314+
stringify!($native),
315+
row,
316+
$col
317+
))
318+
})?;
319+
b.append_value(v);
320+
}
321+
}
322+
Ok(Arc::new(b.finish()) as ArrayRef)
323+
}};
324+
}
325+
use build_float_col;

0 commit comments

Comments
 (0)