Skip to content

Commit e438d9d

Browse files
authored
Merge branch 'main' into fix/executor-zero-threads-and-fs-conflict
2 parents 3b64a56 + 4719eac commit e438d9d

3 files changed

Lines changed: 415 additions & 70 deletions

File tree

src/paimon/format/orc/orc_file_batch_reader.cpp

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -172,56 +172,62 @@ std::shared_ptr<Metrics> OrcFileBatchReader::GetReaderMetrics() const {
172172
return metrics_;
173173
}
174174

175-
Result<std::list<std::string>> OrcFileBatchReader::GetAndCheckIncludedFields(
176-
const ::orc::Type* src_type, const ::orc::Type* target_type,
177-
std::vector<uint64_t>* target_column_ids) {
178-
std::list<std::string> include_fields;
179-
std::unordered_map<std::string, const ::orc::Type*> src_type_map;
180-
for (uint64_t i = 0; i < src_type->getSubtypeCount(); i++) {
181-
src_type_map[src_type->getFieldName(i)] = src_type->getSubtype(i);
175+
Status OrcFileBatchReader::CollectTargetColumnIds(const ::orc::Type* src_type,
176+
const ::orc::Type* target_type,
177+
std::vector<uint64_t>* target_column_ids) {
178+
auto src_kind = src_type->getKind();
179+
auto target_kind = target_type->getKind();
180+
if (src_kind != target_kind) {
181+
return Status::Invalid(fmt::format("type kind mismatch: src {} vs target {}",
182+
src_type->toString(), target_type->toString()));
182183
}
183-
int64_t prev_target_field_col_id = -1;
184-
for (uint64_t i = 0; i < target_type->getSubtypeCount(); i++) {
185-
auto& field_name = target_type->getFieldName(i);
186-
auto iter = src_type_map.find(field_name);
187-
if (iter == src_type_map.end()) {
188-
return Status::Invalid(
189-
fmt::format("field {} not in file schema {}", field_name, src_type->toString()));
190-
}
191-
// Noted that: do not support recall partial fields in nested type
192-
if (iter->second->toString() != target_type->getSubtype(i)->toString()) {
193-
return Status::Invalid(
194-
fmt::format("target_type {} not match src_type {}, mismatch field name {}",
195-
target_type->toString(), src_type->toString(), field_name));
184+
185+
switch (src_kind) {
186+
case ::orc::TypeKind::STRUCT: {
187+
std::unordered_map<std::string, const ::orc::Type*> src_field_map;
188+
for (uint64_t i = 0; i < src_type->getSubtypeCount(); i++) {
189+
src_field_map[src_type->getFieldName(i)] = src_type->getSubtype(i);
190+
}
191+
for (uint64_t i = 0; i < target_type->getSubtypeCount(); i++) {
192+
auto& field_name = target_type->getFieldName(i);
193+
auto iter = src_field_map.find(field_name);
194+
if (iter == src_field_map.end()) {
195+
return Status::Invalid(fmt::format("field {} not in file schema {}", field_name,
196+
src_type->toString()));
197+
}
198+
PAIMON_RETURN_NOT_OK(CollectTargetColumnIds(
199+
iter->second, target_type->getSubtype(i), target_column_ids));
200+
}
201+
break;
196202
}
197-
int64_t target_field_col_id = iter->second->getColumnId();
198-
GetSubColumnIds(iter->second, target_column_ids);
199-
if (prev_target_field_col_id >= target_field_col_id) {
200-
return Status::Invalid(
201-
"The column id of the target field should be monotonically increasing in "
202-
"format reader");
203+
// Do not support partial field recall inside list/map types.
204+
default: {
205+
if (src_type->toString() != target_type->toString()) {
206+
return Status::Invalid(fmt::format("type mismatch: src {} vs target {}",
207+
src_type->toString(), target_type->toString()));
208+
}
209+
target_column_ids->push_back(src_type->getColumnId());
210+
break;
203211
}
204-
prev_target_field_col_id = target_field_col_id;
205-
include_fields.push_back(field_name);
206-
}
207-
return include_fields;
208-
}
209-
210-
void OrcFileBatchReader::GetSubColumnIds(const ::orc::Type* type, std::vector<uint64_t>* col_ids) {
211-
col_ids->push_back(type->getColumnId());
212-
for (uint64_t i = 0; i < type->getSubtypeCount(); i++) {
213-
GetSubColumnIds(type->getSubtype(i), col_ids);
214212
}
213+
return Status::OK();
215214
}
216215

217216
Result<::orc::RowReaderOptions> OrcFileBatchReader::CreateRowReaderOptions(
218217
const ::orc::Type* src_type, const ::orc::Type* target_type,
219218
std::unique_ptr<::orc::SearchArgument>&& search_arg,
220219
const std::map<std::string, std::string>& options, std::vector<uint64_t>* target_column_ids) {
221-
PAIMON_ASSIGN_OR_RAISE(std::list<std::string> include_fields,
222-
GetAndCheckIncludedFields(src_type, target_type, target_column_ids));
220+
PAIMON_RETURN_NOT_OK(CollectTargetColumnIds(src_type, target_type, target_column_ids));
221+
for (size_t i = 1; i < target_column_ids->size(); i++) {
222+
if ((*target_column_ids)[i - 1] >= (*target_column_ids)[i]) {
223+
return Status::Invalid(
224+
"The column id of the target field should be monotonically increasing in "
225+
"format reader");
226+
}
227+
}
223228
::orc::RowReaderOptions row_reader_options;
224-
row_reader_options.include(include_fields);
229+
std::list<uint64_t> include_type_ids(target_column_ids->begin(), target_column_ids->end());
230+
row_reader_options.includeTypes(include_type_ids);
225231
// In order to avoid issue like https://github.com/alibaba/paimon-cpp/issues/42, we explicitly
226232
// set GMT timezone.
227233
row_reader_options.setTimezoneName("GMT");

src/paimon/format/orc/orc_file_batch_reader.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,15 @@ class OrcFileBatchReader : public PrefetchFileBatchReader {
101101
const std::shared_ptr<arrow::MemoryPool>& arrow_pool,
102102
const std::shared_ptr<::orc::MemoryPool>& orc_pool);
103103

104-
static void GetSubColumnIds(const ::orc::Type* type, std::vector<uint64_t>* col_ids);
105-
106104
static Result<::orc::RowReaderOptions> CreateRowReaderOptions(
107105
const ::orc::Type* src_type, const ::orc::Type* target_type,
108106
std::unique_ptr<::orc::SearchArgument>&& search_arg,
109107
const std::map<std::string, std::string>& options,
110108
std::vector<uint64_t>* target_column_ids);
111109

112-
static Result<std::list<std::string>> GetAndCheckIncludedFields(
113-
const ::orc::Type* src_type, const ::orc::Type* target_type,
114-
std::vector<uint64_t>* target_column_ids);
115-
110+
static Status CollectTargetColumnIds(const ::orc::Type* src_type,
111+
const ::orc::Type* target_type,
112+
std::vector<uint64_t>* target_column_ids);
116113
std::map<std::string, std::string> options_;
117114

118115
std::shared_ptr<arrow::MemoryPool> arrow_pool_;

0 commit comments

Comments
 (0)