Skip to content

Commit cb13c01

Browse files
author
张军
committed
fix query without equality delete fields
1 parent 06ca288 commit cb13c01

File tree

4 files changed

+123
-12
lines changed

4 files changed

+123
-12
lines changed

src/deletes/equality_delete.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,18 @@ void IcebergMultiFileList::ScanEqualityDeleteFile(const IcebergManifestEntry &en
6161
id_to_global_column[col.identifier.GetValue<int32_t>()] = i;
6262
}
6363

64+
std::vector<ColumnIndex> new_column_indexes = column_indexes;
65+
for (auto field_id : entry.equality_ids) {
66+
auto global_column_id = id_to_global_column[field_id];
67+
ColumnIndex equality_index(global_column_id);
68+
if (std::find(column_indexes.begin(), column_indexes.end(), equality_index) == column_indexes.end()) {
69+
new_column_indexes.push_back(equality_index);
70+
}
71+
}
72+
6473
unordered_map<idx_t, idx_t> global_id_to_result_id;
65-
for (idx_t i = 0; i < column_indexes.size(); i++) {
66-
auto &column_index = column_indexes[i];
74+
for (idx_t i = 0; i < new_column_indexes.size(); i++) {
75+
auto &column_index = new_column_indexes[i];
6776
if (column_index.IsVirtualColumn()) {
6877
continue;
6978
}
@@ -85,9 +94,6 @@ void IcebergMultiFileList::ScanEqualityDeleteFile(const IcebergManifestEntry &en
8594
auto &vec = result.data[col_idx];
8695

8796
auto it = global_id_to_result_id.find(global_column_id);
88-
if (it == global_id_to_result_id.end()) {
89-
throw NotImplementedException("Equality deletes need the relevant columns to be selected");
90-
}
9197
global_column_id = it->second;
9298

9399
for (idx_t i = 0; i < count; i++) {

src/iceberg_functions/iceberg_multi_file_reader.cpp

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,56 @@ static void ApplyPartitionConstants(const IcebergMultiFileList &multi_file_list,
250250
}
251251
}
252252

253+
ReaderInitializeType IcebergMultiFileReader::InitializeReader(MultiFileReaderData &reader_data,
254+
const MultiFileBindData &bind_data,
255+
const vector<MultiFileColumnDefinition> &global_columns,
256+
const vector<ColumnIndex> &global_column_ids,
257+
optional_ptr<TableFilterSet> table_filters,
258+
ClientContext &context, MultiFileGlobalState &gstate) {
259+
260+
FinalizeBind(reader_data, bind_data.file_options, bind_data.reader_bind, global_columns, global_column_ids, context,
261+
gstate.multi_file_reader_state.get());
262+
263+
unordered_map<int32_t, column_t> id_to_global_column;
264+
for (column_t i = 0; i < global_columns.size(); i++) {
265+
auto &col = global_columns[i];
266+
D_ASSERT(!col.identifier.IsNull());
267+
id_to_global_column[col.identifier.GetValue<int32_t>()] = i;
268+
}
269+
270+
set<int32_t> equality_delete_ids;
271+
const auto &multi_file_list = dynamic_cast<const IcebergMultiFileList &>(gstate.file_list);
272+
auto &reader = *reader_data.reader;
273+
auto file_id = reader.file_list_idx.GetIndex();
274+
auto &data_file = multi_file_list.data_files[file_id];
275+
276+
auto delete_data_it = multi_file_list.equality_delete_data.upper_bound(data_file.sequence_number);
277+
for (; delete_data_it != multi_file_list.equality_delete_data.end(); delete_data_it++) {
278+
auto &files = delete_data_it->second->files;
279+
for (auto &file : files) {
280+
auto &rows = file.rows;
281+
for (auto &row : rows) {
282+
auto &filters = row.filters;
283+
for (auto &filter : filters) {
284+
equality_delete_ids.insert(filter.first);
285+
}
286+
}
287+
}
288+
}
289+
290+
vector<ColumnIndex> new_global_column_ids = global_column_ids;
291+
for (auto field_id : equality_delete_ids) {
292+
auto global_column_id = id_to_global_column[field_id];
293+
ColumnIndex equality_index(global_column_id);
294+
if (std::find(global_column_ids.begin(), global_column_ids.end(), equality_index) == global_column_ids.end()) {
295+
new_global_column_ids.push_back(equality_index);
296+
}
297+
}
298+
299+
return CreateMapping(context, reader_data, global_columns, new_global_column_ids, table_filters, gstate.file_list,
300+
bind_data.reader_bind, bind_data.virtual_columns);
301+
}
302+
253303
void IcebergMultiFileReader::FinalizeBind(MultiFileReaderData &reader_data, const MultiFileOptions &file_options,
254304
const MultiFileReaderBindData &options,
255305
const vector<MultiFileColumnDefinition> &global_columns,
@@ -291,7 +341,8 @@ void IcebergMultiFileReader::FinalizeBind(MultiFileReaderData &reader_data, cons
291341
void IcebergMultiFileReader::ApplyEqualityDeletes(ClientContext &context, DataChunk &output_chunk,
292342
const IcebergMultiFileList &multi_file_list,
293343
const IcebergManifestEntry &data_file,
294-
const vector<MultiFileColumnDefinition> &local_columns) {
344+
const vector<MultiFileColumnDefinition> &local_columns,
345+
unordered_map<idx_t, idx_t> field_id_to_result_id) {
295346
vector<reference<IcebergEqualityDeleteRow>> delete_rows;
296347

297348
auto &metadata = multi_file_list.GetMetadata();
@@ -360,7 +411,25 @@ void IcebergMultiFileReader::ApplyEqualityDeletes(ClientContext &context, DataCh
360411
equalities.push_back(make_uniq<BoundConstantExpression>(Value::BOOLEAN(true)));
361412
}
362413
} else {
363-
equalities.push_back(expression->Copy());
414+
if (field_id_to_result_id.empty()) {
415+
equalities.push_back(expression->Copy());
416+
} else {
417+
idx_t index = field_id_to_result_id[field_id];
418+
if (expression->type == ExpressionType::COMPARE_NOTEQUAL) {
419+
auto &expr = expression->Cast<BoundComparisonExpression>();
420+
auto bound_ref = make_uniq<BoundReferenceExpression>(expr.left->return_type, index);
421+
unique_ptr<Expression> equality_filter = make_uniq<BoundComparisonExpression>(
422+
ExpressionType::COMPARE_NOTEQUAL, std::move(bound_ref), expr.right->Copy());
423+
equalities.push_back(std::move(equality_filter));
424+
} else if (expression->type == ExpressionType::OPERATOR_IS_NOT_NULL) {
425+
auto &expr = expression->Cast<BoundOperatorExpression>();
426+
auto bound_ref = make_uniq<BoundReferenceExpression>(expr.children[0]->return_type, index);
427+
auto is_not_null = make_uniq<BoundOperatorExpression>(ExpressionType::OPERATOR_IS_NOT_NULL,
428+
LogicalType::BOOLEAN);
429+
is_not_null->children.push_back(std::move(bound_ref));
430+
equalities.push_back(std::move(is_not_null));
431+
}
432+
}
364433
}
365434
}
366435

@@ -399,17 +468,46 @@ void IcebergMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFi
399468
DataChunk &input_chunk, DataChunk &output_chunk,
400469
ExpressionExecutor &executor,
401470
optional_ptr<MultiFileReaderGlobalState> global_state) {
471+
472+
// add the extra equality delete fields to output chunk.
473+
int32_t diff = 0;
474+
if (executor.expressions.size() != output_chunk.ColumnCount()) {
475+
diff = executor.expressions.size() - output_chunk.ColumnCount();
476+
for (int32_t i = diff; i > 0; i--) {
477+
int32_t index = input_chunk.ColumnCount() - i;
478+
output_chunk.data.emplace_back(input_chunk.data[index]);
479+
}
480+
}
481+
402482
// Base class finalization first
403483
MultiFileReader::FinalizeChunk(context, bind_data, reader, reader_data, input_chunk, output_chunk, executor,
404484
global_state);
405485

486+
auto &local_columns = reader.columns;
487+
unordered_map<idx_t, idx_t> column_index_to_field_id;
488+
for (idx_t i = 0; i < local_columns.size(); i++) {
489+
auto &col = local_columns[i];
490+
column_index_to_field_id[i] = col.identifier.GetValue<int32_t>();
491+
}
492+
unordered_map<idx_t, idx_t> field_id_to_result_id;
493+
vector<ColumnIndex> column_indexes = reader.column_indexes;
494+
int32_t result_id = executor.expressions.size() - 1;
495+
for (int32_t i = column_indexes.size() - 1; i >= 0; i--) {
496+
ColumnIndex column_index = column_indexes[i];
497+
field_id_to_result_id[column_index_to_field_id[column_index.GetPrimaryIndex()]] = result_id--;
498+
}
499+
406500
D_ASSERT(global_state);
407501
// Get the metadata for this file
408502
const auto &multi_file_list = dynamic_cast<const IcebergMultiFileList &>(*global_state->file_list);
409503
auto file_id = reader.file_list_idx.GetIndex();
410504
auto &data_file = multi_file_list.data_files[file_id];
411-
auto &local_columns = reader.columns;
412-
ApplyEqualityDeletes(context, output_chunk, multi_file_list, data_file, local_columns);
505+
ApplyEqualityDeletes(context, output_chunk, multi_file_list, data_file, local_columns, field_id_to_result_id);
506+
507+
// delete the equality delete fields for result
508+
for (idx_t i = 0; i < diff; i++) {
509+
output_chunk.data.pop_back();
510+
}
413511
}
414512

415513
bool IcebergMultiFileReader::ParseOption(const string &key, const Value &val, MultiFileOptions &options,

src/include/iceberg_multi_file_reader.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ struct IcebergMultiFileReader : public MultiFileReader {
5050
const MultiFileReaderBindData &bind_data, const MultiFileList &file_list,
5151
const vector<MultiFileColumnDefinition> &global_columns,
5252
const vector<ColumnIndex> &global_column_ids) override;
53+
ReaderInitializeType InitializeReader(MultiFileReaderData &reader_data, const MultiFileBindData &bind_data,
54+
const vector<MultiFileColumnDefinition> &global_columns,
55+
const vector<ColumnIndex> &global_column_ids,
56+
optional_ptr<TableFilterSet> table_filters, ClientContext &context,
57+
MultiFileGlobalState &gstate) override;
5358
void FinalizeBind(MultiFileReaderData &reader_data, const MultiFileOptions &file_options,
5459
const MultiFileReaderBindData &options, const vector<MultiFileColumnDefinition> &global_columns,
5560
const vector<ColumnIndex> &global_column_ids, ClientContext &context,
@@ -59,7 +64,8 @@ struct IcebergMultiFileReader : public MultiFileReader {
5964
ExpressionExecutor &executor, optional_ptr<MultiFileReaderGlobalState> global_state) override;
6065
void ApplyEqualityDeletes(ClientContext &context, DataChunk &output_chunk,
6166
const IcebergMultiFileList &multi_file_list, const IcebergManifestEntry &data_file,
62-
const vector<MultiFileColumnDefinition> &local_columns);
67+
const vector<MultiFileColumnDefinition> &local_columns,
68+
unordered_map<idx_t, idx_t> field_id_to_result_id);
6369
bool ParseOption(const string &key, const Value &val, MultiFileOptions &options, ClientContext &context) override;
6470

6571
public:

test/sql/local/equality_deletes.test

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ SELECT filename[17:32], name, id FROM ICEBERG_SCAN('data/persistent/equality_del
4444
equality_deletes b 2
4545
equality_deletes b 1
4646

47-
statement error
47+
query II
4848
SELECT filename[17:32], name FROM ICEBERG_SCAN('data/persistent/equality_deletes/warehouse/mydb/mytable');
4949
----
50-
Not implemented Error: Equality deletes need the relevant columns to be selected
50+
equality_deletes b
51+
equality_deletes b

0 commit comments

Comments
 (0)