Skip to content

Commit fea1cd3

Browse files
authored
feat(query): support arrow stage file formats (#19953)
* feat(query): support arrow stage file formats * fix(query): record arrow copy status and metadata * chore(query): fix arrow reader clippy warnings * fix(query): avoid arrow stage prefix overmatch * fix(query): infer arrow schema past empty files * test(query): cover nested arrow batches * fix(query): load timestamp_tz arrow strings * test(query): avoid arrow nested stringify * fix(query): preserve arrow list child types * test(query): cast arrow timestamp_tz strings in copy * Support Arrow stage unload and empty scans * test(query): fix arrow unload overwrite options
1 parent 9579a5b commit fea1cd3

36 files changed

Lines changed: 1931 additions & 21 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/app/src/principal/file_format.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ pub enum FileFormatParams {
7575
Orc(OrcFileFormatParams),
7676
Avro(AvroFileFormatParams),
7777
Lance(LanceFileFormatParams),
78+
Arrow(ArrowFileFormatParams),
79+
ArrowStream(ArrowFileFormatParams),
7880
}
7981

8082
impl FileFormatParams {
@@ -89,6 +91,8 @@ impl FileFormatParams {
8991
FileFormatParams::Orc(_) => StageFileFormatType::Orc,
9092
FileFormatParams::Avro(_) => StageFileFormatType::Avro,
9193
FileFormatParams::Lance(_) => StageFileFormatType::Lance,
94+
FileFormatParams::Arrow(_) => StageFileFormatType::Arrow,
95+
FileFormatParams::ArrowStream(_) => StageFileFormatType::ArrowStream,
9296
}
9397
}
9498

@@ -103,6 +107,8 @@ impl FileFormatParams {
103107
FileFormatParams::Orc(_) => ".orc",
104108
FileFormatParams::Avro(_) => ".avro",
105109
FileFormatParams::Lance(_) => ".lance",
110+
FileFormatParams::Arrow(_) => ".arrow",
111+
FileFormatParams::ArrowStream(_) => ".arrow_stream",
106112
}
107113
}
108114

@@ -113,6 +119,8 @@ impl FileFormatParams {
113119
| FileFormatParams::Text(_)
114120
| FileFormatParams::NdJson(_)
115121
| FileFormatParams::Parquet(_)
122+
| FileFormatParams::Arrow(_)
123+
| FileFormatParams::ArrowStream(_)
116124
)
117125
}
118126

@@ -138,6 +146,12 @@ impl FileFormatParams {
138146
StageFileFormatType::Lance => {
139147
Ok(FileFormatParams::Lance(LanceFileFormatParams::default()))
140148
}
149+
StageFileFormatType::Arrow => {
150+
Ok(FileFormatParams::Arrow(ArrowFileFormatParams::default()))
151+
}
152+
StageFileFormatType::ArrowStream => Ok(FileFormatParams::ArrowStream(
153+
ArrowFileFormatParams::default(),
154+
)),
141155
_ => Err(ErrorCode::IllegalFileFormat(format!(
142156
"Unsupported file format type: {:?}",
143157
format_type
@@ -156,6 +170,9 @@ impl FileFormatParams {
156170
FileFormatParams::Orc(_) => StageFileCompression::None,
157171
FileFormatParams::Avro(_) => StageFileCompression::None,
158172
FileFormatParams::Lance(_) => StageFileCompression::None,
173+
FileFormatParams::Arrow(_) | FileFormatParams::ArrowStream(_) => {
174+
StageFileCompression::None
175+
}
159176
}
160177
}
161178

@@ -190,6 +207,9 @@ impl FileFormatParams {
190207
v.null_field_as == NullAs::FieldDefault
191208
|| v.missing_field_as == NullAs::FieldDefault
192209
}
210+
FileFormatParams::Arrow(v) | FileFormatParams::ArrowStream(v) => {
211+
v.missing_field_as == NullAs::FieldDefault
212+
}
193213
_ => true,
194214
}
195215
}
@@ -253,6 +273,18 @@ impl FileFormatParams {
253273
)?)
254274
}
255275
StageFileFormatType::Lance => FileFormatParams::Lance(LanceFileFormatParams::default()),
276+
StageFileFormatType::Arrow => {
277+
let missing_field_as = reader.options.remove(MISSING_FIELD_AS);
278+
FileFormatParams::Arrow(ArrowFileFormatParams::try_create(
279+
missing_field_as.as_deref(),
280+
)?)
281+
}
282+
StageFileFormatType::ArrowStream => {
283+
let missing_field_as = reader.options.remove(MISSING_FIELD_AS);
284+
FileFormatParams::ArrowStream(ArrowFileFormatParams::try_create(
285+
missing_field_as.as_deref(),
286+
)?)
287+
}
256288
StageFileFormatType::Csv => {
257289
let default = CsvFileFormatParams::default();
258290
let compression = reader.take_compression_default_none()?;
@@ -1027,6 +1059,18 @@ impl OrcFileFormatParams {
10271059
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
10281060
pub struct LanceFileFormatParams {}
10291061

1062+
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1063+
pub struct ArrowFileFormatParams {
1064+
pub missing_field_as: NullAs,
1065+
}
1066+
1067+
impl ArrowFileFormatParams {
1068+
pub fn try_create(missing_field_as: Option<&str>) -> Result<Self> {
1069+
let missing_field_as = NullAs::parse(missing_field_as, MISSING_FIELD_AS, NullAs::Error)?;
1070+
Ok(Self { missing_field_as })
1071+
}
1072+
}
1073+
10301074
impl Display for FileFormatParams {
10311075
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
10321076
match self {
@@ -1118,6 +1162,20 @@ impl Display for FileFormatParams {
11181162
FileFormatParams::Lance(_) => {
11191163
write!(f, "TYPE = LANCE")
11201164
}
1165+
FileFormatParams::Arrow(params) => {
1166+
write!(
1167+
f,
1168+
"TYPE = ARROW MISSING_FIELD_AS = {}",
1169+
params.missing_field_as
1170+
)
1171+
}
1172+
FileFormatParams::ArrowStream(params) => {
1173+
write!(
1174+
f,
1175+
"TYPE = ARROW_STREAM MISSING_FIELD_AS = {}",
1176+
params.missing_field_as
1177+
)
1178+
}
11211179
}
11221180
}
11231181
}

src/meta/app/src/principal/user_stage.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ pub enum StageFileFormatType {
175175
Orc,
176176
Parquet,
177177
Lance,
178+
Arrow,
179+
ArrowStream,
178180
Xml,
179181
None,
180182
}
@@ -198,8 +200,10 @@ impl FromStr for StageFileFormatType {
198200
"JSON" => Ok(StageFileFormatType::Json),
199201
"ORC" => Ok(StageFileFormatType::Orc),
200202
"AVRO" => Ok(StageFileFormatType::Avro),
203+
"ARROW" => Ok(StageFileFormatType::Arrow),
204+
"ARROW_STREAM" => Ok(StageFileFormatType::ArrowStream),
201205
_ => Err(format!(
202-
"Unknown file format type '{s}', must be one of ( CSV | TEXT | NDJSON | PARQUET | LANCE | ORC | AVRO | JSON )"
206+
"Unknown file format type '{s}', must be one of ( CSV | TEXT | NDJSON | PARQUET | LANCE | ORC | AVRO | JSON | ARROW | ARROW_STREAM )"
203207
)),
204208
}
205209
}
@@ -216,6 +220,8 @@ impl Display for StageFileFormatType {
216220
StageFileFormatType::Orc => write!(f, "ORC"),
217221
StageFileFormatType::Parquet => write!(f, "PARQUET"),
218222
StageFileFormatType::Lance => write!(f, "LANCE"),
223+
StageFileFormatType::Arrow => write!(f, "ARROW"),
224+
StageFileFormatType::ArrowStream => write!(f, "ARROW_STREAM"),
219225
StageFileFormatType::Xml => write!(f, "XML"),
220226
StageFileFormatType::None => write!(f, "NONE"),
221227
}

src/meta/proto-conv/src/impls/file_format.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ impl FromToProtoEnum for mt::principal::StageFileFormatType {
4747
pb::StageFileFormatType::Parquet => Ok(mt::principal::StageFileFormatType::Parquet),
4848
pb::StageFileFormatType::Xml => Ok(mt::principal::StageFileFormatType::Xml),
4949
pb::StageFileFormatType::Lance => Ok(mt::principal::StageFileFormatType::Lance),
50+
pb::StageFileFormatType::Arrow => Ok(mt::principal::StageFileFormatType::Arrow),
51+
pb::StageFileFormatType::ArrowStream => {
52+
Ok(mt::principal::StageFileFormatType::ArrowStream)
53+
}
5054
}
5155
}
5256

@@ -61,6 +65,10 @@ impl FromToProtoEnum for mt::principal::StageFileFormatType {
6165
mt::principal::StageFileFormatType::Parquet => Ok(pb::StageFileFormatType::Parquet),
6266
mt::principal::StageFileFormatType::Xml => Ok(pb::StageFileFormatType::Xml),
6367
mt::principal::StageFileFormatType::Lance => Ok(pb::StageFileFormatType::Lance),
68+
mt::principal::StageFileFormatType::Arrow => Ok(pb::StageFileFormatType::Arrow),
69+
mt::principal::StageFileFormatType::ArrowStream => {
70+
Ok(pb::StageFileFormatType::ArrowStream)
71+
}
6472
mt::principal::StageFileFormatType::None => Err(Incompatible::new(
6573
"StageFileFormatType::None cannot be converted to protobuf".to_string(),
6674
)),
@@ -263,6 +271,16 @@ impl FromToProto for mt::principal::FileFormatParams {
263271
mt::principal::LanceFileFormatParams::from_pb(p)?,
264272
))
265273
}
274+
Some(pb::file_format_params::Format::Arrow(p)) => {
275+
Ok(mt::principal::FileFormatParams::Arrow(
276+
mt::principal::ArrowFileFormatParams::from_pb(p)?,
277+
))
278+
}
279+
Some(pb::file_format_params::Format::ArrowStream(p)) => {
280+
Ok(mt::principal::FileFormatParams::ArrowStream(
281+
mt::principal::ArrowFileFormatParams::from_pb(p)?,
282+
))
283+
}
266284
None => Err(Incompatible::new(
267285
"FileFormatParams.format cannot be None".to_string(),
268286
)),
@@ -316,10 +334,42 @@ impl FromToProto for mt::principal::FileFormatParams {
316334
mt::principal::LanceFileFormatParams::to_pb(p)?,
317335
)),
318336
}),
337+
Self::Arrow(p) => Ok(Self::PB {
338+
format: Some(pb::file_format_params::Format::Arrow(
339+
mt::principal::ArrowFileFormatParams::to_pb(p)?,
340+
)),
341+
}),
342+
Self::ArrowStream(p) => Ok(Self::PB {
343+
format: Some(pb::file_format_params::Format::ArrowStream(
344+
mt::principal::ArrowFileFormatParams::to_pb(p)?,
345+
)),
346+
}),
319347
}
320348
}
321349
}
322350

351+
impl FromToProto for mt::principal::ArrowFileFormatParams {
352+
type PB = pb::ArrowFileFormatParams;
353+
fn get_pb_ver(p: &Self::PB) -> u64 {
354+
p.ver
355+
}
356+
357+
fn from_pb(p: pb::ArrowFileFormatParams) -> Result<Self, Incompatible>
358+
where Self: Sized {
359+
reader_check_msg(p.ver, p.min_reader_ver)?;
360+
mt::principal::ArrowFileFormatParams::try_create(p.missing_field_as.as_deref())
361+
.map_err(|e| Incompatible::new(e.to_string()))
362+
}
363+
364+
fn to_pb(&self) -> Result<pb::ArrowFileFormatParams, Incompatible> {
365+
Ok(pb::ArrowFileFormatParams {
366+
ver: VER,
367+
min_reader_ver: MIN_READER_VER,
368+
missing_field_as: Some(self.missing_field_as.to_string()),
369+
})
370+
}
371+
}
372+
323373
impl FromToProto for mt::principal::OrcFileFormatParams {
324374
type PB = pb::OrcFileFormatParams;
325375
fn get_pb_ver(p: &Self::PB) -> u64 {

src/meta/proto-conv/src/util.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
205205
(173, "2026-04-16: Update: file_format.proto/CsvFileFormatParams add quote_style"),
206206
(174, "2026-04-28: Add: AuthInfo::KeyPair for key-pair authentication"),
207207
(175, "2026-05-08: Add: field_stats_truncate_len per-column string stats truncation in TableMeta"),
208-
(176, "2026-05-25: Add: task.proto/Task.script_sql")
208+
(176, "2026-05-25: Add: task.proto/Task.script_sql"),
209+
(177, "2026-06-02: Add: file_format.proto Arrow and ArrowStream file formats")
209210
// Dear developer:
210211
// If you're gonna add a new metadata version, you'll have to add a test for it.
211212
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,4 @@ mod v173_csv_quote_style;
168168
mod v174_user_key_pair;
169169
mod v175_field_stats_truncate_len;
170170
mod v176_task_script_sql;
171+
mod v177_arrow_file_format_params;
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright 2026 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_meta_app::principal::ArrowFileFormatParams;
16+
use databend_common_meta_app::principal::FileFormatParams;
17+
use databend_common_meta_app::principal::NullAs;
18+
use fastrace::func_name;
19+
20+
use crate::common;
21+
22+
// These bytes are built when a new version in introduced,
23+
// and are kept for backward compatibility test.
24+
//
25+
// *************************************************************
26+
// * These messages should never be updated, *
27+
// * only be added when a new version is added, *
28+
// * or be removed when an old version is no longer supported. *
29+
// *************************************************************
30+
//
31+
#[test]
32+
fn test_decode_v177_arrow_file_format_params() -> anyhow::Result<()> {
33+
let arrow_file_format_params_v177 = vec![
34+
10, 13, 70, 73, 69, 76, 68, 95, 68, 69, 70, 65, 85, 76, 84, 160, 6, 177, 1, 168, 6, 24,
35+
];
36+
37+
let want = || ArrowFileFormatParams {
38+
missing_field_as: NullAs::FieldDefault,
39+
};
40+
common::test_load_old(
41+
func_name!(),
42+
arrow_file_format_params_v177.as_slice(),
43+
177,
44+
want(),
45+
)?;
46+
common::test_pb_from_to(func_name!(), want())?;
47+
Ok(())
48+
}
49+
50+
#[test]
51+
fn test_decode_v177_file_format_params_arrow() -> anyhow::Result<()> {
52+
let file_format_params_v177 = vec![
53+
82, 22, 10, 13, 70, 73, 69, 76, 68, 95, 68, 69, 70, 65, 85, 76, 84, 160, 6, 177, 1, 168, 6,
54+
24,
55+
];
56+
57+
let want = || {
58+
FileFormatParams::Arrow(ArrowFileFormatParams {
59+
missing_field_as: NullAs::FieldDefault,
60+
})
61+
};
62+
common::test_load_old(func_name!(), file_format_params_v177.as_slice(), 0, want())?;
63+
common::test_pb_from_to(func_name!(), want())?;
64+
Ok(())
65+
}
66+
67+
#[test]
68+
fn test_decode_v177_file_format_params_arrow_stream() -> anyhow::Result<()> {
69+
let file_format_params_v177 = vec![
70+
90, 22, 10, 13, 70, 73, 69, 76, 68, 95, 68, 69, 70, 65, 85, 76, 84, 160, 6, 177, 1, 168, 6,
71+
24,
72+
];
73+
74+
let want = || {
75+
FileFormatParams::ArrowStream(ArrowFileFormatParams {
76+
missing_field_as: NullAs::FieldDefault,
77+
})
78+
};
79+
common::test_load_old(func_name!(), file_format_params_v177.as_slice(), 0, want())?;
80+
common::test_pb_from_to(func_name!(), want())?;
81+
Ok(())
82+
}

src/meta/protos/proto/file_format.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ enum StageFileFormatType {
2727
NdJson = 6;
2828
Text = 7;
2929
Lance = 8;
30+
Arrow = 9;
31+
ArrowStream = 10;
3032
}
3133

3234
enum StageFileCompression {
@@ -89,6 +91,8 @@ message FileFormatParams {
8991
OrcFileFormatParams orc = 7;
9092
AvroFileFormatParams avro = 8;
9193
LanceFileFormatParams lance = 9;
94+
ArrowFileFormatParams arrow = 10;
95+
ArrowFileFormatParams arrow_stream = 11;
9296
}
9397
}
9498

@@ -191,3 +195,9 @@ message LanceFileFormatParams {
191195
uint64 ver = 100;
192196
uint64 min_reader_ver = 101;
193197
}
198+
199+
message ArrowFileFormatParams {
200+
uint64 ver = 100;
201+
uint64 min_reader_ver = 101;
202+
optional string missing_field_as = 1;
203+
}

src/query/ast/src/parser/copy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,14 @@ pub fn copy_into(i: Input) -> IResult<Statement> {
152152
#copy_into_location:"`COPY
153153
INTO { @<stage_name>[/<path>] | '<uri>' }
154154
FROM { [<database_name>.]<table_name> | ( <query> ) }
155-
[ FILE_FORMAT = ( { TYPE = { CSV | NDJSON | PARQUET | TEXT | AVRO | ORC | JSON | LANCE } [ formatTypeOptions ] } ) ]
155+
[ FILE_FORMAT = ( { TYPE = { CSV | NDJSON | PARQUET | TEXT | AVRO | ORC | JSON | LANCE | ARROW | ARROW_STREAM } [ formatTypeOptions ] } ) ]
156156
[ copyOptions ]`"
157157
| #copy_into_table: "`COPY
158158
INTO { [<database_name>.]<table_name> { ( <columns> ) } }
159159
FROM { @<stage_name>[/<path>]
160160
| '<uri>'
161161
| ( select <expr>, [ <expr> ...] from {@<stage_name>[/<path>]( <args> ) | '<uri>'} ) }
162-
[ FILE_FORMAT = ( { TYPE = { CSV | NDJSON | PARQUET | TEXT | AVRO | ORC | JSON | LANCE } [ formatTypeOptions ] } ) ]
162+
[ FILE_FORMAT = ( { TYPE = { CSV | NDJSON | PARQUET | TEXT | AVRO | ORC | JSON | LANCE | ARROW | ARROW_STREAM } [ formatTypeOptions ] } ) ]
163163
[ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
164164
[ PATTERN = '<regex_pattern>' ]
165165
[ copyOptions ]`"

src/query/ast/src/parser/stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub fn connection_options(i: Input) -> IResult<BTreeMap<String, String>> {
9797
pub fn format_options(i: Input) -> IResult<FileFormatOptions> {
9898
let option_type = map(
9999
rule! {
100-
TYPE ~ "=" ~ ( TEXT | TSV | CSV | NDJSON | PARQUET | JSON | ORC | AVRO | LANCE)
100+
TYPE ~ "=" ~ ( TEXT | TSV | CSV | NDJSON | PARQUET | JSON | ORC | AVRO | LANCE | ARROW | ARROW_STREAM)
101101
},
102102
|(_, _, v)| {
103103
(

0 commit comments

Comments
 (0)