Skip to content

Commit

Permalink
[Enhancement](paimon)support paimon top level schema change.
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Mar 7, 2025
1 parent 2e1268a commit 51af3eb
Show file tree
Hide file tree
Showing 9 changed files with 831 additions and 22 deletions.
230 changes: 228 additions & 2 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "paimon_reader.h"

#include <rapidjson/document.h>

#include <vector>

#include "common/status.h"
Expand All @@ -25,8 +27,15 @@
namespace doris::vectorized {
#include "common/compile_check_begin.h"
PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile, const TFileScanRangeParams& params)
: TableFormatReader(std::move(file_format_reader)), _profile(profile), _params(params) {
RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx,
ShardedKVCache* kv_cache)
: TableFormatReader(std::move(file_format_reader)),
_profile(profile),
_io_ctx(io_ctx),
_range(range),
_kv_cache(kv_cache),
_params(params) {
static const char* paimon_profile = "PaimonProfile";
ADD_TIMER(_profile, paimon_profile);
_paimon_profile.num_delete_rows =
Expand All @@ -35,6 +44,223 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
}

Status PaimonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _table_col_to_file_col.find(col.name);
if (iter != _table_col_to_file_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}

RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));

if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _file_col_to_table_col.find(col.name);
if (iter != _file_col_to_table_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}
return Status::OK();
}

/**
sql:
create table tmp3 (
k int,
vVV string,
a array<int>,
b struct<a:int,b:string>,
c map<string,int>
) tblproperties (
'primary-key' = 'k',
"file.format" = "parquet"
);
schema :
{
"version" : 2,
"id" : 0,
"fields" : [ {
"id" : 0,
"name" : "k",
"type" : "INT NOT NULL"
}, {
"id" : 1,
"name" : "vVV",
"type" : "STRING"
}, {
"id" : 2,
"name" : "a",
"type" : {
"type" : "ARRAY",
"element" : "INT"
}
}, {
"id" : 3,
"name" : "b",
"type" : {
"type" : "ROW",
"fields" : [ {
"id" : 4,
"name" : "a",
"type" : "INT"
}, {
"id" : 5,
"name" : "b",
"type" : "STRING"
} ]
}
}, {
"id" : 6,
"name" : "c",
"type" : {
"type" : "MAP",
"key" : "STRING NOT NULL",
"value" : "INT"
}
} ],
"highestFieldId" : 6,
"partitionKeys" : [ ],
"primaryKeys" : [ "k" ],
"options" : {
"owner" : "root",
"file.format" : "parquet"
},
"timeMillis" : 1741338580187
}
*/
Status PaimonReader::read_schema_file(std::map<uint64_t, std::string>& file_id_to_name) {
file_id_to_name.clear();
if (!_range.table_format_params.paimon_params.__isset.schema_file_path) [[unlikely]] {
return Status::RuntimeError("miss schema file");
}

io::FileSystemProperties properties = {
.system_type = _params.file_type,
.properties = _params.properties,
.hdfs_params = _params.hdfs_params,
.broker_addresses {},
};
if (_params.__isset.broker_addresses) {
properties.broker_addresses.assign(_params.broker_addresses.begin(),
_params.broker_addresses.end());
}
io::FileDescription file_description = {
.path = _range.table_format_params.paimon_params.schema_file_path,
.file_size = -1,
.mtime = 0,
.fs_name = _range.fs_name,
};
auto schema_file_reader = DORIS_TRY(FileFactory::create_file_reader(
properties, file_description, io::FileReaderOptions::DEFAULT));

size_t bytes_read = schema_file_reader->size();
std::vector<char> buf(bytes_read);
Slice schema_result(buf.data(), bytes_read);
{
SCOPED_TIMER(_paimon_profile.delete_files_read_time);
RETURN_IF_ERROR(schema_file_reader->read_at(0, schema_result, &bytes_read, _io_ctx));
}

rapidjson::Document json_doc;
if (json_doc.Parse(schema_result.data, schema_result.size).HasParseError()) {
return Status::IOError("failed to parse json file, path:{}",
_range.table_format_params.paimon_params.schema_file_path);
}

if (!json_doc.HasMember("fields") || !json_doc["fields"].IsArray()) {
return Status::IOError("Invalid JSON: missing or incorrect 'fields' array, path:{} ",
_range.table_format_params.paimon_params.schema_file_path);
}
const auto& fields = json_doc["fields"];
for (const auto& field : fields.GetArray()) {
if (field.HasMember("id") && field["id"].IsInt() && field.HasMember("name") &&
field["name"].IsString()) {
int id = field["id"].GetInt();
std::string name = to_lower(field["name"].GetString());
file_id_to_name[id] = name;
}
}

return Status::OK();
}

Status PaimonReader::gen_file_col_name(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
table_col_name_to_value_range) {
// It is a bit similar to iceberg. I will consider integrating it when I write hudi schema change later.
_table_col_to_file_col.clear();
_file_col_to_table_col.clear();

if (!_range.table_format_params.paimon_params.__isset.schema_file_path) [[unlikely]] {
return Status::RuntimeError("miss schema file");
}

Status create_status = Status::OK();
using MapType = std::map<uint64_t, std::string>;
const auto table_id_to_file_name = *_kv_cache->get<MapType>(
_range.table_format_params.paimon_params.schema_file_path, [&]() -> MapType* {
auto* file_id_to_name_ptr = new MapType();
create_status = read_schema_file(*file_id_to_name_ptr);
if (!create_status) {
delete file_id_to_name_ptr;
return nullptr;
}
return file_id_to_name_ptr;
});
RETURN_IF_ERROR(create_status);

for (auto [table_col_id, file_col_name] : table_id_to_file_name) {
if (table_col_id_table_name_map.find(table_col_id) == table_col_id_table_name_map.end()) {
continue;
}
auto& table_col_name = table_col_id_table_name_map.at(table_col_id);

_table_col_to_file_col.emplace(table_col_name, file_col_name);
_file_col_to_table_col.emplace(file_col_name, table_col_name);
if (table_col_name != file_col_name) {
_has_schema_change = true;
}
}

_all_required_col_names.clear();
_not_in_file_col_names.clear();
for (auto name : read_table_col_names) {
auto iter = _table_col_to_file_col.find(name);
if (iter == _table_col_to_file_col.end()) {
auto name_low = to_lower(name);
_all_required_col_names.emplace_back(name_low);

_table_col_to_file_col.emplace(name, name_low);
_file_col_to_table_col.emplace(name_low, name);
if (name != name_low) {
_has_schema_change = true;
}
} else {
_all_required_col_names.emplace_back(iter->second);
}
}

for (auto& it : *table_col_name_to_value_range) {
auto iter = _table_col_to_file_col.find(it.first);
if (iter == _table_col_to_file_col.end()) {
_new_colname_to_value_range.emplace(it.first, it.second);
} else {
_new_colname_to_value_range.emplace(iter->second, it.second);
}
}
return Status::OK();
}

Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
const auto& table_desc = range.table_format_params.paimon_params;
if (!table_desc.__isset.deletion_file) {
Expand Down
83 changes: 78 additions & 5 deletions be/src/vec/exec/format/table/paimon_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,49 @@ namespace doris::vectorized {
class PaimonReader : public TableFormatReader {
public:
PaimonReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params);
const TFileScanRangeParams& params, const TFileRangeDesc& range,
io::IOContext* io_ctx, ShardedKVCache* kv_cache);

~PaimonReader() override = default;

Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final;

Status read_schema_file(std::map<uint64_t, std::string>& file_id_to_name);

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status gen_file_col_name(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
table_col_name_to_value_range);

protected:
struct PaimonProfile {
RuntimeProfile::Counter* num_delete_rows;
RuntimeProfile::Counter* delete_files_read_time;
};

std::vector<int64_t> _delete_rows;
RuntimeProfile* _profile;
PaimonProfile _paimon_profile;

io::IOContext* _io_ctx;
const TFileRangeDesc& _range;

std::unordered_map<std::string, ColumnValueRangeType> _new_colname_to_value_range;

std::unordered_map<std::string, std::string> _file_col_to_table_col;
std::unordered_map<std::string, std::string> _table_col_to_file_col;

std::vector<std::string> _all_required_col_names;
std::vector<std::string> _not_in_file_col_names;

bool _has_schema_change = false;

// owned by scan node
ShardedKVCache* _kv_cache;

virtual void set_delete_rows() = 0;

private:
Expand All @@ -52,28 +82,71 @@ class PaimonOrcReader final : public PaimonReader {
public:
ENABLE_FACTORY_CREATOR(PaimonOrcReader);
PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params)
: PaimonReader(std::move(file_format_reader), profile, params) {};
const TFileScanRangeParams& params, const TFileRangeDesc& range,
io::IOContext* io_ctx, ShardedKVCache* kv_cache)
: PaimonReader(std::move(file_format_reader), profile, params, range, io_ctx,
kv_cache) {};
~PaimonOrcReader() final = default;

void set_delete_rows() override {
(reinterpret_cast<OrcReader*>(_file_format_reader.get()))
->set_position_delete_rowids(&_delete_rows);
}

Status init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
std::unordered_map<std::string, ColumnValueRangeType>* table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
RETURN_IF_ERROR(gen_file_col_name(read_table_col_names, table_col_id_table_name_map,
table_col_name_to_value_range));
auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range,
conjuncts, false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts);
}
};

class PaimonParquetReader final : public PaimonReader {
public:
ENABLE_FACTORY_CREATOR(PaimonParquetReader);
PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params)
: PaimonReader(std::move(file_format_reader), profile, params) {};
const TFileScanRangeParams& params, const TFileRangeDesc& range,
io::IOContext* io_ctx, ShardedKVCache* kv_cache)
: PaimonReader(std::move(file_format_reader), profile, params, range, io_ctx,
kv_cache) {};

~PaimonParquetReader() final = default;

void set_delete_rows() override {
(reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
->set_delete_rows(&_delete_rows);
}

Status init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
std::unordered_map<std::string, ColumnValueRangeType>* table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
RETURN_IF_ERROR(gen_file_col_name(read_table_col_names, table_col_id_table_name_map,
table_col_name_to_value_range));
auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);

return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
}
};
#include "common/compile_check_end.h"
} // namespace doris::vectorized
Loading

0 comments on commit 51af3eb

Please sign in to comment.