feat(datafusion): add Hive-style INSERT OVERWRITE PARTITION support and simplify API#280
feat(datafusion): add Hive-style INSERT OVERWRITE PARTITION support and simplify API#280JingsongLi wants to merge 11 commits intoapache:mainfrom
Conversation
276e1e5 to
37df99a
Compare
| .overwrite(messages, Some(static_partitions)) | ||
| .await | ||
| .map_err(to_datafusion_error)?; | ||
|
|
There was a problem hiding this comment.
PARTITION (dt) is all-dynamic, but this path still passes Some(static_partitions) into overwrite().
With an empty map from parse_static_partitions(), this will drop all existing partitions.
| while let Some(batch_result) = stream.next().await { | ||
| let batch = batch_result?; | ||
| if batch.num_rows() == 0 { | ||
| continue; |
There was a problem hiding this comment.
This skips column-count/type validation for an empty source result. For example, INSERT OVERWRITE t PARTITION (dt = '2024-01-01') SELECT id FROM t WHERE false should fail because the source only producesid while the target still expects id, name, but this path skips validation and the later overwrite() still truncates the target partition.
| } | ||
| col_checked = true; | ||
| } | ||
| let augmented = append_partition_columns( |
There was a problem hiding this comment.
This path is currently treating the source columns as positional "all non-static table fields in schema order". That breaks INSERT OVERWRITE ... PARTITION (...) when the statement uses an explicit target column list, because this handler never applies insert.columns / after_columns before calling append_partition_columns.
For example, if the table schema is (dt, id, name) and the statement is:
INSERT OVERWRITE t (name, id) PARTITION (dt = '2024-01-01') VALUES ('alice', 1)
then the source columns here are (name, id), but append_partition_columns will consume them as (id, name) in table-schema order. That means we either get an unexpected cast failure ('alice' -> id) or silently write the wrong values when the types are compatible.
We should preserve target-column mapping here and reorder/project the source batch by column name before appending static partition columns.
…nd simplify API Add static partition overwrite via PARTITION clause (e.g. INSERT OVERWRITE t PARTITION (dt = 'value') SELECT ...). Partial partition specs use predicate-based filtering to overwrite all matching sub-partitions. Simplify PaimonSqlHandler::new to 3 args (ctx, catalog, name) with internal DynamicOptions management, catalog registration, and relation planner setup. Remove redundant overwrite_partitions from CommitEntriesPlan, unifying all overwrite filtering through PartitionFilter.
d87606e to
fa26b06
Compare
| } | ||
|
|
||
| pub fn set_overwrite(&mut self, is_overwrite: bool) { | ||
| if let Self::Dynamic(a) = self { |
There was a problem hiding this comment.
set_overwrite() does not affect CrossPartitionAssigner. With cross-partition PK tables and merge-engine='first-row', INSERT OVERWRITE ... PARTITION can delete old data without writing the replacement
row, causing data loss.
Purpose
Add static partition overwrite via PARTITION clause (e.g. INSERT OVERWRITE t PARTITION (dt = 'value') SELECT ...). Partial partition specs use predicate-based filtering to overwrite all matching sub-partitions. Simplify PaimonSqlHandler::new to 3 args (ctx, catalog, name) with internal DynamicOptions management, catalog registration, and relation planner setup. Remove redundant overwrite_partitions from CommitEntriesPlan, unifying all overwrite filtering through PartitionFilter.
Brief change log
Tests
API and Format
Documentation