Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add path stats check and fix sparse cache #48834

Open
wants to merge 1 commit into
base: variant-sparse
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
#include "runtime/thread_context.h"
#include "util/time.h"
#include "util/trace.h"
#include "vec/common/schema_util.h"

using std::vector;

Expand Down Expand Up @@ -1329,6 +1330,11 @@ Status Compaction::check_correctness() {
_tablet->tablet_id(), _input_row_num, _stats.merged_rows, _stats.filtered_rows,
_output_rowset->num_rows());
}
if (_tablet->keys_type() == KeysType::DUP_KEYS) {
// only check path stats for dup_keys since the rows may be merged in other models
RETURN_IF_ERROR(vectorized::schema_util::check_path_stats(_input_rowsets, _output_rowset,
_tablet->tablet_id()));
}
return Status::OK();
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ class StorageReadOptions {
RowRanges row_ranges;
size_t topn_limit = 0;
// Cache for sparse column data to avoid redundant reads
vectorized::ColumnPtr sparse_column_cache;
// col_unique_id -> cached column_ptr
std::unordered_map<int32_t, vectorized::ColumnPtr> sparse_column_cache;
};

struct CompactionSampleInfo {
Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,10 @@ Status VariantColumnReader::_create_sparse_merge_reader(ColumnIterator** iterato
VLOG_DEBUG << "subcolumns to merge " << src_subcolumns_for_sparse.size();

// Create sparse column merge reader
*iterator = new SparseColumnMergeReader(
path_set_info.sub_path_set, std::unique_ptr<ColumnIterator>(inner_iter),
std::move(src_subcolumns_for_sparse), const_cast<StorageReadOptions*>(opts));
*iterator = new SparseColumnMergeReader(path_set_info.sub_path_set,
std::unique_ptr<ColumnIterator>(inner_iter),
std::move(src_subcolumns_for_sparse),
const_cast<StorageReadOptions*>(opts), target_col);
return Status::OK();
}

Expand Down Expand Up @@ -385,7 +386,7 @@ Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** iter
*iterator = new SparseColumnExtractReader(
relative_path.get_path(), std::unique_ptr<ColumnIterator>(inner_iter),
// need to modify sparse_column_cache, so use const_cast here
const_cast<StorageReadOptions*>(opts));
const_cast<StorageReadOptions*>(opts), target_col);
return Status::OK();
}
if (relative_path.get_path() == SPARSE_COLUMN_PATH) {
Expand Down Expand Up @@ -465,8 +466,9 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const Tablet
ColumnIterator* inner_iter;
RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter));
DCHECK(opt);
*iterator = new SparseColumnExtractReader(
relative_path.get_path(), std::unique_ptr<ColumnIterator>(inner_iter), nullptr);
*iterator = new SparseColumnExtractReader(relative_path.get_path(),
std::unique_ptr<ColumnIterator>(inner_iter),
nullptr, target_col);
return Status::OK();
}

Expand Down
35 changes: 19 additions & 16 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class BaseSparseColumnProcessor : public ColumnIterator {
vectorized::MutableColumnPtr _sparse_column;
StorageReadOptions* _read_opts; // Shared cache pointer
std::unique_ptr<ColumnIterator> _sparse_column_reader;

const TabletColumn& _col;
// Pure virtual method for data processing when encounter existing sparse columns(to be implemented by subclasses)
virtual void _process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst,
size_t num_rows) = 0;
Expand All @@ -182,8 +182,9 @@ class BaseSparseColumnProcessor : public ColumnIterator {
size_t num_rows) = 0;

public:
BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader, StorageReadOptions* opts)
: _read_opts(opts), _sparse_column_reader(std::move(reader)) {
BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader, StorageReadOptions* opts,
const TabletColumn& col)
: _read_opts(opts), _sparse_column_reader(std::move(reader)), _col(col) {
_sparse_column = vectorized::ColumnObject::create_sparse_column_fn();
}

Expand All @@ -208,15 +209,17 @@ class BaseSparseColumnProcessor : public ColumnIterator {
Status _process_batch(ReadMethod&& read_method, size_t nrows,
vectorized::MutableColumnPtr& dst) {
// Cache check and population logic
if (_read_opts && _read_opts->sparse_column_cache &&
if (_read_opts && _read_opts->sparse_column_cache[_col.parent_unique_id()] &&
ColumnReader::is_compaction_reader_type(_read_opts->io_ctx.reader_type)) {
_sparse_column = _read_opts->sparse_column_cache->assume_mutable();
_sparse_column =
_read_opts->sparse_column_cache[_col.parent_unique_id()]->assume_mutable();
} else {
_sparse_column->clear();
RETURN_IF_ERROR(read_method());

if (_read_opts) {
_read_opts->sparse_column_cache = _sparse_column->assume_mutable();
_read_opts->sparse_column_cache[_col.parent_unique_id()] =
_sparse_column->get_ptr();
}
}

Expand All @@ -231,6 +234,14 @@ class BaseSparseColumnProcessor : public ColumnIterator {
}
return Status::OK();
}
};

// Implementation for path extraction processor
class SparseColumnExtractReader : public BaseSparseColumnProcessor {
public:
SparseColumnExtractReader(std::string_view path, std::unique_ptr<ColumnIterator> reader,
StorageReadOptions* opts, const TabletColumn& col)
: BaseSparseColumnProcessor(std::move(reader), opts, col), _path(path) {}

// Batch processing using template method
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override {
Expand All @@ -248,14 +259,6 @@ class BaseSparseColumnProcessor : public ColumnIterator {
},
count, dst);
}
};

// Implementation for path extraction processor
class SparseColumnExtractReader : public BaseSparseColumnProcessor {
public:
SparseColumnExtractReader(std::string_view path, std::unique_ptr<ColumnIterator> reader,
StorageReadOptions* opts)
: BaseSparseColumnProcessor(std::move(reader), opts), _path(path) {}

private:
std::string _path;
Expand All @@ -280,8 +283,8 @@ class SparseColumnMergeReader : public BaseSparseColumnProcessor {
SparseColumnMergeReader(const TabletSchema::PathSet& path_map,
std::unique_ptr<ColumnIterator>&& sparse_column_reader,
SubstreamReaderTree&& src_subcolumns_for_sparse,
StorageReadOptions* opts)
: BaseSparseColumnProcessor(std::move(sparse_column_reader), opts),
StorageReadOptions* opts, const TabletColumn& col)
: BaseSparseColumnProcessor(std::move(sparse_column_reader), opts, col),
_src_subcolumn_map(path_map),
_src_subcolumns_for_sparse(src_subcolumns_for_sparse) {}
Status init(const ColumnIteratorOptions& opts) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
bool is_mem_reuse = block->mem_reuse();
DCHECK(is_mem_reuse);
// Clear the sparse column cache before processing a new batch
_opts.sparse_column_cache = nullptr;
_opts.sparse_column_cache.clear();
SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
if (UNLIKELY(!_lazy_inited)) {
RETURN_IF_ERROR(_lazy_init());
Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po

// caculate stats for variant type
// TODO it's tricky here, maybe come up with a better idea
_maybe_calculate_variant_stats(block, id, cid);
_maybe_calculate_variant_stats(block, id, cid, row_pos, num_rows);
}
if (_has_key) {
if (_is_mow_with_cluster_key()) {
Expand Down Expand Up @@ -1329,8 +1329,11 @@ inline bool SegmentWriter::_is_mow_with_cluster_key() {
// Compaction will extend sparse column and is visible during read and write, in order to
// persit variant stats info, we should do extra caculation during flushing segment, otherwise
// the info is lost
void SegmentWriter::_maybe_calculate_variant_stats(const vectorized::Block* block, size_t id,
size_t cid) {
void SegmentWriter::_maybe_calculate_variant_stats(
const vectorized::Block* block,
size_t id, // id is the offset of the column in the block
size_t cid, // cid is the column id in TabletSchema
size_t row_pos, size_t num_rows) {
// Only process sparse columns during compaction
if (!_tablet_schema->columns()[cid]->is_sparse_column() ||
_opts.write_type != DataWriteType::TYPE_COMPACTION) {
Expand All @@ -1351,7 +1354,8 @@ void SegmentWriter::_maybe_calculate_variant_stats(const vectorized::Block* bloc

// Found matching column, calculate statistics
auto* stats = column.mutable_variant_statistics();
vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column, stats);
vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column, stats,
row_pos, num_rows);

VLOG_DEBUG << "sparse stats columns " << stats->sparse_column_non_null_size_size();
break;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class SegmentWriter {
Status _write_footer();
Status _write_raw_data(const std::vector<Slice>& slices);
void _maybe_invalid_row_cache(const std::string& key);
void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t id, size_t cid);
void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t id, size_t cid,
size_t row_pos, size_t num_rows);
std::string _encode_keys(const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos);
// used for unique-key with merge on write and segment min_max key
Expand Down
12 changes: 7 additions & 5 deletions be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ Status _create_column_writer(uint32_t cid, const TabletColumn& column,
opt->need_bloom_filter = column.is_bf_column();
opt->need_bitmap_index = column.has_bitmap_index();
const auto& index = tablet_schema->inverted_index(column.parent_unique_id());
VLOG_DEBUG << "column: " << column.name() << " need_inverted_index: " << opt->need_inverted_index
<< " need_bloom_filter: " << opt->need_bloom_filter
<< " need_bitmap_index: " << opt->need_bitmap_index;
VLOG_DEBUG << "column: " << column.name()
<< " need_inverted_index: " << opt->need_inverted_index
<< " need_bloom_filter: " << opt->need_bloom_filter
<< " need_bitmap_index: " << opt->need_bitmap_index;

// init inverted index
if (index != nullptr &&
Expand Down Expand Up @@ -660,8 +661,9 @@ Status VariantSubcolumnWriter::finalize() {
_opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id());
// refresh opts and get writer with flush column
vectorized::schema_util::inherit_column_attributes(parent_column, flush_column);
VLOG_DEBUG << "parent_column: " << parent_column.name() << " flush_column: "
<< flush_column.name() << " is_bf_column: " << parent_column.is_bf_column() << " "
VLOG_DEBUG << "parent_column: " << parent_column.name()
<< " flush_column: " << flush_column.name()
<< " is_bf_column: " << parent_column.is_bf_column() << " "
<< flush_column.is_bf_column();
RETURN_IF_ERROR(_create_column_writer(
0, flush_column, _opts.rowset_ctx->tablet_schema, _opts.inverted_index_file_writer,
Expand Down
78 changes: 56 additions & 22 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,36 @@ void get_subpaths(const TabletColumn& variant,
}
}

Status check_path_stats(const std::vector<RowsetSharedPtr>& intputs, RowsetSharedPtr output,
int64_t tablet_id) {
std::unordered_map<int32_t, PathToNoneNullValues> original_uid_to_path_stats;
for (const auto& rs : intputs) {
RETURN_IF_ERROR(collect_path_stats(rs, original_uid_to_path_stats));
}
std::unordered_map<int32_t, PathToNoneNullValues> output_uid_to_path_stats;
RETURN_IF_ERROR(collect_path_stats(output, output_uid_to_path_stats));
for (const auto& [uid, stats] : original_uid_to_path_stats) {
if (output_uid_to_path_stats.find(uid) == output_uid_to_path_stats.end()) {
return Status::InternalError("Path stats not found for uid {}, tablet_id {}", uid,
tablet_id);
}
if (stats.size() != output_uid_to_path_stats.at(uid).size()) {
return Status::InternalError("Path stats size not match for uid {}, tablet_id {}", uid,
tablet_id);
}
for (const auto& [path, size] : stats) {
if (output_uid_to_path_stats.at(uid).at(path) != size) {
return Status::InternalError(
"Path stats not match for uid {} with path `{}`, input size {}, output "
"size {}, "
"tablet_id {}",
uid, path, size, output_uid_to_path_stats.at(uid).at(path), tablet_id);
}
}
}
return Status::OK();
}

// Build the temporary schema for compaction
// 1. collect path stats from all rowsets
// 2. get the subpaths and sparse paths for each unique id
Expand Down Expand Up @@ -763,7 +793,8 @@ Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets,
subcolumn.set_name(column->name_lower_case() + "." + subpath.to_string());
subcolumn.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
subcolumn.set_parent_unique_id(column->unique_id());
subcolumn.set_path_info(PathInData(column->name_lower_case() + "." + subpath.to_string()));
subcolumn.set_path_info(
PathInData(column->name_lower_case() + "." + subpath.to_string()));
subcolumn.set_aggregation_method(column->aggregation());
subcolumn.set_variant_max_subcolumns_count(column->variant_max_subcolumns_count());
subcolumn.set_is_nullable(true);
Expand All @@ -783,7 +814,8 @@ Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets,

// Calculate statistics about variant data paths from the encoded sparse column
void calculate_variant_stats(const IColumn& encoded_sparse_column,
segment_v2::VariantStatisticsPB* stats) {
segment_v2::VariantStatisticsPB* stats, size_t row_pos,
size_t num_rows) {
// Cast input column to ColumnMap type since sparse column is stored as a map
const auto& map_column = assert_cast<const ColumnMap&>(encoded_sparse_column);

Expand All @@ -793,35 +825,37 @@ void calculate_variant_stats(const IColumn& encoded_sparse_column,
// Get the keys column which contains the paths as strings
const auto& sparse_data_paths =
assert_cast<const ColumnString*>(map_column.get_keys_ptr().get());

const auto& serialized_sparse_column_offsets =
assert_cast<const ColumnArray::Offsets64&>(map_column.get_offsets());
// Iterate through all paths in the sparse column
for (size_t i = 0; i != sparse_data_paths->size(); ++i) {
auto path = sparse_data_paths->get_data_at(i);

// If path already exists in statistics, increment its count
if (auto it = sparse_data_paths_statistics.find(path);
it != sparse_data_paths_statistics.end()) {
++it->second;
}
// If path doesn't exist and we haven't hit the max statistics size limit,
// add it with count 1
else if (sparse_data_paths_statistics.size() <
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
sparse_data_paths_statistics.emplace(path, 1);
for (size_t i = row_pos; i != row_pos + num_rows; ++i) {
size_t offset = serialized_sparse_column_offsets[i - 1];
size_t end = serialized_sparse_column_offsets[i];
for (size_t j = offset; j != end; ++j) {
auto path = sparse_data_paths->get_data_at(j);
// If path already exists in statistics, increment its count
if (auto it = sparse_data_paths_statistics.find(path);
it != sparse_data_paths_statistics.end()) {
++it->second;
}
// If path doesn't exist and we haven't hit the max statistics size limit,
// add it with count 1
else if (sparse_data_paths_statistics.size() <
VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) {
sparse_data_paths_statistics.emplace(path, 1);
}
}
}

// Copy the collected statistics into the protobuf stats object
// This maps each path string to its frequency count
for (const auto& [path, size] : sparse_data_paths_statistics) {
const auto& sparse_path = path.to_string();
auto it = stats->sparse_column_non_null_size().find(sparse_path);
if (it == stats->sparse_column_non_null_size().end()) {
stats->mutable_sparse_column_non_null_size()->emplace(sparse_path, size);
auto& count_map = *stats->mutable_sparse_column_non_null_size();
if (auto it = count_map.find(sparse_path); it != count_map.end()) {
it->second += size;
} else {
size_t original_size = it->second;
stats->mutable_sparse_column_non_null_size()->emplace(sparse_path,
original_size + size);
count_map.emplace(sparse_path, size);
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/common/schema_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,14 @@ TabletColumn create_sparse_column(const TabletColumn& variant);
// Build the temporary schema for compaction, this will reduce the memory usage of compacting variant columns
Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets, TabletSchemaSPtr& target);

// Check if the path stats are consistent between inputs rowsets and output rowset.
// Used to check the correctness of compaction.
Status check_path_stats(const std::vector<RowsetSharedPtr>& intputs, RowsetSharedPtr output,
int64_t tablet_id);

// Calculate statistics about variant data paths from the encoded sparse column
void calculate_variant_stats(const IColumn& encoded_sparse_column,
segment_v2::VariantStatisticsPB* stats);
segment_v2::VariantStatisticsPB* stats, size_t row_pos,
size_t num_rows);

} // namespace doris::vectorized::schema_util
Loading