Skip to content

Commit 8b44a7f

Browse files
committed
feat: Optimize PK MOR read performance by reduce per-row object allocation and improve merge read performance.
1 parent 2c4019e commit 8b44a7f

7 files changed

Lines changed: 66 additions & 7 deletions

File tree

src/paimon/common/data/generic_row.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,15 @@ class GenericRow : public InternalRow {
220220
return row;
221221
}
222222

223+
void ResetFields() {
224+
for (auto& field : fields_) {
225+
field = VariantType{};
226+
}
227+
kind_ = RowKind::Insert();
228+
row_holder_.clear();
229+
bytes_holder_.reset();
230+
}
231+
223232
private:
224233
/// The array to store the actual internal format values.
225234
std::vector<VariantType> fields_;

src/paimon/common/data/generic_row_test.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,35 @@ TEST(GenericRowTest, TestSimple) {
104104
"00:00:00.100000020,123.45678998765432145678,array,row,map,null)",
105105
row.ToString());
106106
}
107+
108+
TEST(GenericRowTest, TestResetFields) {
109+
auto pool = GetDefaultPool();
110+
GenericRow row(3);
111+
row.SetField(0, static_cast<int32_t>(1));
112+
row.SetField(1, BinaryString::FromString("old", pool.get()));
113+
row.SetField(2, Bytes::AllocateBytes("bytes", pool.get()));
114+
row.SetRowKind(RowKind::Delete());
115+
116+
ASSERT_FALSE(row.IsNullAt(0));
117+
ASSERT_FALSE(row.IsNullAt(1));
118+
ASSERT_FALSE(row.IsNullAt(2));
119+
ASSERT_EQ(row.GetRowKind().value(), RowKind::Delete());
120+
121+
row.ResetFields();
122+
123+
ASSERT_EQ(row.GetFieldCount(), 3);
124+
ASSERT_TRUE(row.IsNullAt(0));
125+
ASSERT_TRUE(row.IsNullAt(1));
126+
ASSERT_TRUE(row.IsNullAt(2));
127+
ASSERT_EQ(row.GetRowKind().value(), RowKind::Insert());
128+
129+
row.SetField(0, static_cast<int32_t>(2));
130+
row.SetField(1, BinaryString::FromString("new", pool.get()));
131+
row.SetRowKind(RowKind::UpdateAfter());
132+
133+
ASSERT_EQ(row.GetInt(0), static_cast<int32_t>(2));
134+
ASSERT_EQ(row.GetString(1), BinaryString::FromString("new", pool.get()));
135+
ASSERT_TRUE(row.IsNullAt(2));
136+
ASSERT_EQ(row.GetRowKind().value(), RowKind::UpdateAfter());
137+
}
107138
} // namespace paimon::test

src/paimon/core/io/key_value_data_file_record_reader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ KeyValueDataFileRecordReader::KeyValueDataFileRecordReader(
5050

5151
Result<bool> KeyValueDataFileRecordReader::Iterator::HasNext() const {
5252
int64_t array_length = reader_->row_kind_array_->length();
53-
const auto& selection_bitmap = reader_->selection_bitmap_;
54-
if (selection_bitmap.Cardinality() == array_length) {
53+
if (selection_cardinality_ == array_length) {
5554
// all rows are selected in bitmap
5655
return cursor_ < array_length;
5756
}
57+
const auto& selection_bitmap = reader_->selection_bitmap_;
5858
auto iter = selection_bitmap.EqualOrLarger(cursor_);
5959
if (iter == selection_bitmap.End()) {
6060
// no row are selected

src/paimon/core/io/key_value_data_file_record_reader.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ class KeyValueDataFileRecordReader : public KeyValueRecordReader {
5555
class Iterator : public KeyValueRecordReader::Iterator {
5656
public:
5757
Iterator(KeyValueDataFileRecordReader* reader, int64_t previous_batch_first_row_number)
58-
: previous_batch_first_row_number_(previous_batch_first_row_number), reader_(reader) {}
58+
: previous_batch_first_row_number_(previous_batch_first_row_number),
59+
reader_(reader),
60+
selection_cardinality_(reader->selection_bitmap_.Cardinality()) {}
5961
Result<bool> HasNext() const override;
6062
Result<KeyValue> Next() override;
6163
Result<std::pair<int64_t, KeyValue>> NextWithFilePos();
@@ -64,6 +66,7 @@ class KeyValueDataFileRecordReader : public KeyValueRecordReader {
6466
int64_t previous_batch_first_row_number_;
6567
mutable int64_t cursor_ = 0;
6668
KeyValueDataFileRecordReader* reader_ = nullptr;
69+
int64_t selection_cardinality_ = 0;
6770
};
6871

6972
Result<std::unique_ptr<KeyValueRecordReader::Iterator>> NextBatch() override;

src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ Status AggregateMergeFunction::Add(KeyValue&& kv) {
6262
// mark the current row for deletion and initialize the row with input values.
6363
if (remove_record_on_delete_ && kv.value_kind == RowKind::Delete()) {
6464
current_delete_row_ = true;
65-
row_ = std::make_unique<GenericRow>(getters_.size());
65+
if (row_) {
66+
row_->ResetFields();
67+
} else {
68+
row_ = std::make_unique<GenericRow>(getters_.size());
69+
}
6670
for (size_t i = 0; i < getters_.size(); i++) {
6771
row_->SetField(i, getters_[i](*(kv.value)));
6872
}

src/paimon/core/mergetree/compact/aggregate/aggregate_merge_function.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ class AggregateMergeFunction : public MergeFunction {
5252
void Reset() override {
5353
latest_kv_ = std::nullopt;
5454
current_delete_row_ = false;
55-
row_ = std::make_unique<GenericRow>(getters_.size());
55+
if (row_) {
56+
row_->ResetFields();
57+
} else {
58+
row_ = std::make_unique<GenericRow>(getters_.size());
59+
}
5660
for (const auto& agg : aggregators_) {
5761
agg->Reset();
5862
}

src/paimon/core/mergetree/compact/partial_update_merge_function.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,11 @@ void PartialUpdateMergeFunction::Reset() {
269269
current_key_.reset();
270270
meet_insert_ = false;
271271
not_null_column_filled_ = false;
272-
row_ = std::make_unique<GenericRow>(getters_.size());
272+
if (row_) {
273+
row_->ResetFields();
274+
} else {
275+
row_ = std::make_unique<GenericRow>(getters_.size());
276+
}
273277
last_seq_num_ = 0;
274278
for (auto& [_, agg] : field_aggregators_) {
275279
assert(agg);
@@ -304,7 +308,11 @@ Status PartialUpdateMergeFunction::Add(KeyValue&& moved_kv) {
304308
if (remove_record_on_delete_) {
305309
if (kv.value_kind == RowKind::Delete()) {
306310
current_delete_row_ = true;
307-
row_ = std::make_unique<GenericRow>(getters_.size());
311+
if (row_) {
312+
row_->ResetFields();
313+
} else {
314+
row_ = std::make_unique<GenericRow>(getters_.size());
315+
}
308316
InitRowAndHoldData(std::move(kv.value));
309317
} else if (!not_null_column_filled_) {
310318
InitRowAndHoldData(std::move(kv.value));

0 commit comments

Comments
 (0)