Skip to content

Commit

Permalink
[Enhancement](paimon)support paimon top level schema change.
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Mar 6, 2025
1 parent 2e1268a commit ab35a1c
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 22 deletions.
97 changes: 95 additions & 2 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
#include "paimon_reader.h"

#include <vector>
#include <rapidjson/document.h>


#include "common/status.h"
#include "util/deletion_vector.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"
PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile, const TFileScanRangeParams& params)
: TableFormatReader(std::move(file_format_reader)), _profile(profile), _params(params) {
RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range,
io::IOContext* io_ctx)
: TableFormatReader(std::move(file_format_reader)), _profile(profile),
_io_ctx(io_ctx),_range(range), _params(params) {

static const char* paimon_profile = "PaimonProfile";
ADD_TIMER(_profile, paimon_profile);
_paimon_profile.num_delete_rows =
Expand All @@ -35,6 +41,93 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
}


Status PaimonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _table_col_to_file_col.find(col.name);
if (iter != _table_col_to_file_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}

RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));

if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _file_col_to_table_col.find(col.name);
if (iter != _file_col_to_table_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}
return Status::OK();
}

Status PaimonReader::read_schema_file(std::map<int, std::string> & file_id_to_name) {
io::FileSystemProperties properties = {
.system_type = _params.file_type,
.properties = _params.properties,
.hdfs_params = _params.hdfs_params,
.broker_addresses {},
};
if (_params.__isset.broker_addresses) {
properties.broker_addresses.assign(_params.broker_addresses.begin(),
_params.broker_addresses.end());
}
io::FileDescription file_description = {
.path = _range.table_format_params.paimon_params.schema_file_path,
.file_size = -1,
.mtime = 0,
.fs_name = _range.fs_name,
};
auto delete_file_reader = DORIS_TRY(FileFactory::create_file_reader(
properties, file_description, io::FileReaderOptions::DEFAULT));

size_t bytes_read = delete_file_reader->size();
std::vector<char> buf(bytes_read);
Slice result(buf.data(), bytes_read);
{
SCOPED_TIMER(_paimon_profile.delete_files_read_time);
RETURN_IF_ERROR(
delete_file_reader->read_at(0, result, &bytes_read, _io_ctx));
}

std::string jsonString(result.data, result.size);
rapidjson::Document doc;
if (doc.Parse(jsonString.c_str()).HasParseError()) {
std::cerr << "JSON parse error!" << std::endl;
return Status::OK();
}

if (!doc.HasMember("fields") || !doc["fields"].IsArray()) {
std::cerr << "Invalid JSON: missing or incorrect 'fields' array." << std::endl;
return Status::OK();
}
file_id_to_name.clear();
const auto& fields = doc["fields"];
for (const auto& field : fields.GetArray()) {
if (field.HasMember("id") && field["id"].IsInt() &&
field.HasMember("name") && field["name"].IsString()) {
int id = field["id"].GetInt();
std::string name = field["name"].GetString();
file_id_to_name[id] = name;
}
}

for (const auto& [id, name] : file_id_to_name) {
std::cout << "ID: " << id << " -> Name: " << name << std::endl;
}
return Status::OK();
}



Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
const auto& table_desc = range.table_format_params.paimon_params;
if (!table_desc.__isset.deletion_file) {
Expand Down
139 changes: 134 additions & 5 deletions be/src/vec/exec/format/table/paimon_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,95 @@ namespace doris::vectorized {
class PaimonReader : public TableFormatReader {
public:
PaimonReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params);
const TFileScanRangeParams& params,
const TFileRangeDesc& range,
io::IOContext* io_ctx);
~PaimonReader() override = default;

Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final;

Status read_schema_file(std::map<int, std::string> & file_id_to_name);

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status gen_file_col_name(const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
std::unordered_map<std::string, ColumnValueRangeType>* table_col_name_to_value_range) {
// It is a bit similar to iceberg. I will consider integrating it when I write hudi later.
_table_col_to_file_col.clear();
_file_col_to_table_col.clear();

std::map<int,std::string> table_id_to_file_name;
RETURN_IF_ERROR(read_schema_file(table_id_to_file_name));

for (auto [table_col_id,file_col_name ]: table_id_to_file_name ){
if (table_col_id_table_name_map.find(table_col_id) == table_col_id_table_name_map.end()) {
continue;
}
auto& table_col_name = table_col_id_table_name_map.at(table_col_id);

_table_col_to_file_col.emplace(table_col_name, file_col_name);
_file_col_to_table_col.emplace(file_col_name, table_col_name);
if (table_col_name != file_col_name) {
_has_schema_change = true;
}

}

_all_required_col_names.clear();
_not_in_file_col_names.clear();
for (auto name : read_table_col_names) {
auto iter = _table_col_to_file_col.find(name);
if (iter == _table_col_to_file_col.end()) {
auto name_low = to_lower(name);//todo ??? Case sensitivity ?
_all_required_col_names.emplace_back(name_low);

_table_col_to_file_col.emplace(name, name_low);
_file_col_to_table_col.emplace(name_low, name);
if (name != name_low) {
_has_schema_change = true;
}
} else {
_all_required_col_names.emplace_back(iter->second);
}
}

for (auto& it : *table_col_name_to_value_range) {
auto iter = _table_col_to_file_col.find(it.first);
if (iter == _table_col_to_file_col.end()) {
_new_colname_to_value_range.emplace(it.first, it.second);
} else {
_new_colname_to_value_range.emplace(iter->second, it.second);
}
}
return Status::OK();
}

protected:
struct PaimonProfile {
RuntimeProfile::Counter* num_delete_rows;
RuntimeProfile::Counter* delete_files_read_time;
};

std::vector<int64_t> _delete_rows;
RuntimeProfile* _profile;
PaimonProfile _paimon_profile;

io::IOContext* _io_ctx;
const TFileRangeDesc& _range;


std::unordered_map<std::string, ColumnValueRangeType> _new_colname_to_value_range;

std::unordered_map<std::string, std::string> _file_col_to_table_col;
std::unordered_map<std::string, std::string> _table_col_to_file_col;

std::vector<std::string> _all_required_col_names;
std::vector<std::string> _not_in_file_col_names;

bool _has_schema_change = false;


virtual void set_delete_rows() = 0;

private:
Expand All @@ -52,28 +128,81 @@ class PaimonOrcReader final : public PaimonReader {
public:
ENABLE_FACTORY_CREATOR(PaimonOrcReader);
PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params)
: PaimonReader(std::move(file_format_reader), profile, params) {};
const TFileScanRangeParams& params,const TFileRangeDesc& range,
io::IOContext* io_ctx)
: PaimonReader(std::move(file_format_reader), profile, params,range,io_ctx) {};
~PaimonOrcReader() final = default;

void set_delete_rows() override {
(reinterpret_cast<OrcReader*>(_file_format_reader.get()))
->set_position_delete_rowids(&_delete_rows);
}

Status init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
std::unordered_map<std::string, ColumnValueRangeType>* table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts,
const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {

RETURN_IF_ERROR(
gen_file_col_name(read_table_col_names,table_col_id_table_name_map,table_col_name_to_value_range));



auto *orc_reader = static_cast<OrcReader *>(_file_format_reader.get());
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);

std::cout <<"_all_required_col_names = " << _all_required_col_names.size() <<"\n";
for(auto x: _all_required_col_names){
std::cout <<"col_name = " <<x<<"\n";
}

return orc_reader->init_reader(&_all_required_col_names,
&_new_colname_to_value_range,
conjuncts, false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
}
};

class PaimonParquetReader final : public PaimonReader {
public:
ENABLE_FACTORY_CREATOR(PaimonParquetReader);
PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params)
: PaimonReader(std::move(file_format_reader), profile, params) {};
const TFileScanRangeParams& params,const TFileRangeDesc& range,
io::IOContext* io_ctx)
: PaimonReader(std::move(file_format_reader), profile, params,range,io_ctx) {};
~PaimonParquetReader() final = default;

void set_delete_rows() override {
(reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
->set_delete_rows(&_delete_rows);
}

Status init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<uint64_t, std::string>& table_col_id_table_name_map,
std::unordered_map<std::string, ColumnValueRangeType>* table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {

RETURN_IF_ERROR(
gen_file_col_name(read_table_col_names,table_col_id_table_name_map,table_col_name_to_value_range));
auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);

return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);

}
};
#include "common/compile_check_end.h"
} // namespace doris::vectorized
37 changes: 26 additions & 11 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,15 +998,15 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_file_col_names, place_holder, _colname_to_value_range,
std::unique_ptr<PaimonParquetReader> paimon_reader =
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
*_params, range,
_io_ctx.get());
init_status = paimon_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
_col_name_to_slot_id, &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonParquetReader> paimon_reader =
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
*_params);
RETURN_IF_ERROR(paimon_reader->init_row_filters(range, _io_ctx.get()));
_cur_reader = std::move(paimon_reader);
} else {
Expand Down Expand Up @@ -1064,12 +1064,18 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
init_status = orc_reader->init_reader(
&_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,

std::unique_ptr<PaimonOrcReader> paimon_reader =
PaimonOrcReader::create_unique(std::move(orc_reader), _profile, *_params,
range,_io_ctx.get());

init_status = paimon_reader->init_reader(
_file_col_names, _col_id_name_map,
_colname_to_value_range, _push_down_conjuncts,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonOrcReader> paimon_reader =
PaimonOrcReader::create_unique(std::move(orc_reader), _profile, *_params);


RETURN_IF_ERROR(paimon_reader->init_row_filters(range, _io_ctx.get()));
_cur_reader = std::move(paimon_reader);
} else {
Expand Down Expand Up @@ -1274,9 +1280,12 @@ Status VFileScanner::_init_expr_ctxes() {
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
if (it->second->col_unique_id() > 0) {
//paimon start from 0 .
if (it->second->col_unique_id() >= 0) {
_col_id_name_map.emplace(it->second->col_unique_id(), it->second->col_name());
}


} else {
_partition_slot_descs.emplace_back(it->second);
if (_is_load) {
Expand All @@ -1289,6 +1298,12 @@ Status VFileScanner::_init_expr_ctxes() {
}
}

std::cout << "_col_id_name_map = " << _col_id_name_map.size() <<"\n";
for (auto [col_id , name] : _col_id_name_map) {
std::cout << "col_id = " << col_id << "\n";
std::cout << "name = " << name << "\n";
}

// set column name to default value expr map
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
Expand Down
Loading

0 comments on commit ab35a1c

Please sign in to comment.