Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/debug_points.h"
#include "vec/common/schema_util.h"
#include "util/stack_util.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mInt32(variant_max_json_key_length, "255");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
DEFINE_mBool(enable_vertical_compact_variant_subcolumns, "true");
DEFINE_mBool(enable_variant_doc_sparse_write_subcolumns, "true");

DEFINE_Validator(variant_max_json_key_length,
[](const int config) -> bool { return config > 0 && config <= 65535; });
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ DECLARE_mInt32(variant_max_json_key_length);
DECLARE_mBool(variant_throw_exeception_on_invalid_json);
// Enable vertical compact subcolumns of variant column
DECLARE_mBool(enable_vertical_compact_variant_subcolumns);
DECLARE_mBool(enable_variant_doc_sparse_write_subcolumns);

DECLARE_mBool(enable_merge_on_write_correctness_check);
// USED FOR DEBUGING
Expand Down
1 change: 0 additions & 1 deletion be/src/common/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
const std::string PARTIAL_UPDATE_AUTO_INC_COL = "__PARTIAL_UPDATE_AUTO_INC_COLUMN__";
const std::string VIRTUAL_COLUMN_PREFIX = "__DORIS_VIRTUAL_COL__";
const std::string SPARSE_COLUMN_PATH = "__DORIS_VARIANT_SPARSE__";

/// The maximum precision representable by a 4-byte decimal (Decimal4Value)
constexpr int MAX_DECIMAL32_PRECISION = 9;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
#include "util/doris_metrics.h"
#include "util/key_util.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/jsonb/serialize.h"

Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
#include "util/pretty_printer.h"
#include "util/time.h"
#include "util/trace.h"
#include "vec/common/schema_util.h"
#include "vec/common/variant_util.h"

using std::vector;

Expand Down Expand Up @@ -369,7 +369,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
_output_rowset = _output_rs_writer->manual_build(rowset_meta);

// 2. check variant column path stats
RETURN_IF_ERROR(vectorized::schema_util::VariantCompactionUtil::check_path_stats(
RETURN_IF_ERROR(vectorized::variant_util::VariantCompactionUtil::check_path_stats(
_input_rowsets, _output_rowset, _tablet));
return Status::OK();
}
Expand Down Expand Up @@ -420,7 +420,7 @@ Status CompactionMixin::build_basic_info(bool is_ordered_compaction) {
// for ordered compaction, we don't need to extend the schema for variant columns
if (_enable_vertical_compact_variant_subcolumns && !is_ordered_compaction) {
RETURN_IF_ERROR(
vectorized::schema_util::VariantCompactionUtil::get_extended_compaction_schema(
vectorized::variant_util::VariantCompactionUtil::get_extended_compaction_schema(
_input_rowsets, _cur_tablet_schema));
}
return Status::OK();
Expand Down Expand Up @@ -1400,7 +1400,7 @@ Status Compaction::check_correctness() {
_output_rowset->num_rows());
}
// 2. check variant column path stats
RETURN_IF_ERROR(vectorized::schema_util::VariantCompactionUtil::check_path_stats(
RETURN_IF_ERROR(vectorized::variant_util::VariantCompactionUtil::check_path_stats(
_input_rowsets, _output_rowset, _tablet));
return Status::OK();
}
Expand Down Expand Up @@ -1464,7 +1464,7 @@ Status CloudCompactionMixin::build_basic_info() {
// so get_extended_compaction_schema will extended the schema for variant columns
if (_enable_vertical_compact_variant_subcolumns) {
RETURN_IF_ERROR(
vectorized::schema_util::VariantCompactionUtil::get_extended_compaction_schema(
vectorized::variant_util::VariantCompactionUtil::get_extended_compaction_schema(
_input_rowsets, _cur_tablet_schema));
}
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ struct OlapReaderStatistics {
int64_t variant_subtree_leaf_iter_count = 0;
int64_t variant_subtree_hierarchical_iter_count = 0;
int64_t variant_subtree_sparse_iter_count = 0;
int64_t variant_doc_value_column_iter_count = 0;
};

using ColumnId = uint32_t;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
#include "util/stopwatch.hpp"
#include "util/time.h"
#include "vec/columns/column.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"

Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
#include "olap/tablet_schema.h"
#include "util/slice.h"
#include "util/time.h"
#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
#include "vec/core/block.h"
#include "vec/sink/load_stream_stub.h"

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
#include "olap/tablet_schema.h"
#include "olap/tablet_schema_cache.h"
#include "runtime/exec_env.h"
#include "vec/common/schema_util.h"
#include "vec/common/variant_util.h"

namespace doris {

Expand Down Expand Up @@ -364,7 +364,7 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
if (tablet_schema()->num_variant_columns() > 0) {
// merge extracted columns
TabletSchemaSPtr merged_schema;
static_cast<void>(vectorized::schema_util::get_least_common_schema(
static_cast<void>(vectorized::variant_util::get_least_common_schema(
{tablet_schema(), other.tablet_schema()}, nullptr, merged_schema));
if (*_schema != *merged_schema) {
set_tablet_schema(merged_schema);
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class DataDir;
class Tablet;
class FileWriterCreator;
class SegmentCollector;
namespace vectorized::schema_util {
class LocalSchemaChangeRecorder;
}

struct RowsetWriterContext {
RowsetWriterContext() : schema_lock(new std::mutex) {
Expand Down
30 changes: 0 additions & 30 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "vec/columns/column_string.h"
#include "vec/columns/column_variant.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h" // variant column
#include "vec/core/block.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/core/types.h"
Expand All @@ -66,10 +65,6 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}
vectorized::Block flush_block(*block);
if (_context.write_type != DataWriteType::TYPE_COMPACTION &&
_context.tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
Expand All @@ -85,31 +80,6 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}

Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
}

std::vector<int> variant_column_pos;
for (int i = 0; i < block.columns(); ++i) {
const auto& entry = block.get_by_position(i);
if (entry.type->get_primitive_type() == TYPE_VARIANT) {
variant_column_pos.push_back(i);
}
}

if (variant_column_pos.empty()) {
return Status::OK();
}

vectorized::ParseConfig config;
config.enable_flatten_nested = _context.tablet_schema->variant_flatten_nested();
RETURN_IF_ERROR(
vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config));
return Status::OK();
}

Status SegmentFlusher::close() {
RETURN_IF_ERROR(_seg_files.close());
RETURN_IF_ERROR(_idx_files.finish_close());
Expand Down
5 changes: 0 additions & 5 deletions be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ class SegmentFlusher {
Status create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer, uint32_t segment_id);

private:
// This method will catch exception when allocate memory failed
Status _parse_variant_columns(vectorized::Block& block) {
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
}
Status _internal_parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
#include "vec/columns/column_struct.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/common/string_ref.h"
#include "vec/common/typeid_cast.h"
#include "vec/core/types.h"
Expand Down
15 changes: 14 additions & 1 deletion be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ class NullBitmapBuilder {
size_t raw_est = raw_bytes + raw_overhead + kReserveSlackBytes;
size_t reserve_bytes = std::min(raw_est, run_bytes_est);
if (_bitmap_buf.capacity() < reserve_bytes) {
_bitmap_buf.reserve(reserve_bytes);
const size_t cap = _bitmap_buf.capacity();
const size_t grow = cap + cap / 2;
const size_t new_cap = std::max(reserve_bytes, grow);
_bitmap_buf.reserve(new_cap);
}
}

Expand Down Expand Up @@ -321,7 +324,17 @@ Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
const TabletColumn* column, io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
// Variant extracted columns have two kinds of physical writers:
// - Doc-value snapshot column (`...__DORIS_VARIANT_DOC_VALUE__...`): use `VariantDocCompactWriter`
// to store the doc snapshot in a compact binary form.
// - Regular extracted subcolumns: use `VariantSubcolumnWriter`.
// The root VARIANT column itself uses `VariantColumnWriter`.
if (column->is_extracted_column()) {
if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) {
*writer = std::make_unique<VariantDocCompactWriter>(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)));
return Status::OK();
}
VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path();
*writer = std::make_unique<VariantSubcolumnWriter>(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)));
Expand Down
22 changes: 1 addition & 21 deletions be/src/olap/rowset/segment_v2/external_col_meta_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,27 +172,7 @@ Status ExternalColMetaUtil::write_external_column_meta(
// 2) write pointers via proto fields
footer->set_col_meta_region_start(meta_region_start);

// 3) clear inline columns to enable true on-demand meta loading
// Note: footer->columns has already been pruned to only Top Level Columns in externalize_from_footer
// But for full externalization, we might want to clear them all or keep only necessary info?
// The original logic was footer->clear_columns().
// If we clear columns, the Reader needs to know how to reconstruct the schema.
// Currently, SegmentFooterPB.columns is used as the schema source if present.
// If we clear it, Reader must rely on External Meta.
// However, the Reader typically reads footer first. If columns is empty, it assumes V3 and reads external.
// So yes, we should clear it.
// But wait, in externalize_from_footer we carefully put Top Level columns back into footer->columns.
// Why? Because in previous logic, we might want to keep roots in footer?
// The previous logic: "replace Footer.columns with only the kept top-level columns".
// BUT then `write_external_column_meta` calls `footer->clear_columns()` at the end!
// So `footer->columns` will be empty anyway.
// The only reason to reconstruct `footer->columns` in `externalize_from_footer` is if `write_external_column_meta` logic depended on it.
// In my updated `write_external_column_meta`, I iterate over `all_metas` which is returned by `externalize_from_footer`.
// So I don't strictly need `footer->columns` to be correct in between.
// However, strictly following protocol: `externalize_from_footer` modifies footer to reflect "logical" columns (Top Level).
// And then `write_external_column_meta` finalizes it by clearing them and setting pointers.

footer->clear_columns();
// Note: footer->columns has already been pruned in externalize_from_footer
return Status::OK();
}

Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
#include "util/coding.h"
#include "util/slice.h" // Slice
#include "vec/columns/column.h"
#include "vec/common/schema_util.h"
#include "vec/common/string_ref.h"
#include "vec/core/field.h"
#include "vec/data_types/data_type.h"
Expand Down Expand Up @@ -650,7 +649,7 @@ Status Segment::new_default_iterator(const TabletColumn& tablet_column,
Status Segment::new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
const StorageReadOptions* opt,
const std::unordered_map<int32_t, PathToSparseColumnCacheUPtr>*
const std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>*
variant_sparse_column_cache) {
if (opt->runtime_state != nullptr) {
_be_exec_version = opt->runtime_state->be_exec_version();
Expand All @@ -675,7 +674,7 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column,
}
if (reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT) {
// if sparse_column_cache_ptr is nullptr, means the sparse column cache is not used
PathToSparseColumnCache* sparse_column_cache_ptr = nullptr;
PathToBinaryColumnCache* sparse_column_cache_ptr = nullptr;
if (variant_sparse_column_cache) {
auto it = variant_sparse_column_cache->find(unique_id);
if (it != variant_sparse_column_cache->end()) {
Expand Down Expand Up @@ -950,7 +949,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
vectorized::ColumnPtr source_ptr;
// storage may have different type with schema, so we need to cast the column
RETURN_IF_ERROR(vectorized::schema_util::cast_column(
RETURN_IF_ERROR(vectorized::variant_util::cast_column(
vectorized::ColumnWithTypeAndName(file_storage_column->get_ptr(), storage_type,
column.name()),
slot->type(), &source_ptr));
Expand Down
16 changes: 15 additions & 1 deletion be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ class ColumnReaderCache;
class ColumnMetaAccessor;

using SegmentSharedPtr = std::shared_ptr<Segment>;

struct SparseColumnCache;
using SparseColumnCacheSPtr = std::shared_ptr<SparseColumnCache>;

// key is column path, value is the sparse column cache
// now column path is only SPARSE_COLUMN_PATH, in the future, we can add more sparse column paths
using PathToSparseColumnCache = std::unordered_map<std::string, SparseColumnCacheSPtr>;
using PathToSparseColumnCacheUPtr = std::unique_ptr<PathToSparseColumnCache>;

struct BinaryColumnCache;
using BinaryColumnCacheSPtr = std::shared_ptr<BinaryColumnCache>;
using PathToBinaryColumnCache = std::unordered_map<std::string, BinaryColumnCacheSPtr>;
using PathToBinaryColumnCacheUPtr = std::unique_ptr<PathToBinaryColumnCache>;

// A Segment is used to represent a segment in memory format. When segment is
// generated, it won't be modified, so this struct aimed to help read operation.
// It will prepare all ColumnReader to create ColumnIterator as needed.
Expand Down Expand Up @@ -115,7 +129,7 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
// if variant_sparse_column_cache is nullptr, means the sparse column cache is not used
Status new_column_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter, const StorageReadOptions* opt,
const std::unordered_map<int32_t, PathToSparseColumnCacheUPtr>*
const std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>*
variant_sparse_column_cache = nullptr);

Status new_index_iterator(const TabletColumn& tablet_column, const TabletIndex* index_meta,
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
#include "vec/columns/column_variant.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/common/string_ref.h"
#include "vec/common/typeid_cast.h"
#include "vec/core/column_with_type_and_name.h"
Expand Down Expand Up @@ -359,7 +358,7 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
if (int32_t uid = col->get_unique_id(); !_variant_sparse_column_cache.contains(uid)) {
DCHECK(uid >= 0);
_variant_sparse_column_cache.emplace(uid,
std::make_unique<PathToSparseColumnCache>());
std::make_unique<PathToBinaryColumnCache>());
}
}
}
Expand Down Expand Up @@ -2342,8 +2341,8 @@ Status SegmentIterator::_convert_to_expected_type(const std::vector<ColumnId>& c
vectorized::ColumnPtr expected;
vectorized::ColumnPtr original =
_current_return_columns[i]->assume_mutable()->get_ptr();
RETURN_IF_ERROR(vectorized::schema_util::cast_column({original, file_column_type, ""},
expected_type, &expected));
RETURN_IF_ERROR(vectorized::variant_util::cast_column({original, file_column_type, ""},
expected_type, &expected));
_current_return_columns[i] = expected->assume_mutable();
_converted_column_ids[i] = true;
VLOG_DEBUG << fmt::format(
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "vec/columns/column.h"
#include "vec/common/schema_util.h"
#include "vec/common/variant_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
Expand Down Expand Up @@ -262,7 +262,7 @@ class SegmentIterator : public RowwiseIterator {
RETURN_IF_ERROR(copy_column_data_by_selector(_current_return_columns[cid].get(),
tmp, sel_rowid_idx, select_size,
_opts.block_row_max));
RETURN_IF_ERROR(vectorized::schema_util::cast_column(
RETURN_IF_ERROR(vectorized::variant_util::cast_column(
{tmp->get_ptr(), storage_type, ""}, block->get_by_position(block_cid).type,
&block->get_by_position(block_cid).column));
} else {
Expand Down Expand Up @@ -513,7 +513,7 @@ class SegmentIterator : public RowwiseIterator {
IndexQueryContextPtr _index_query_context;

// key is column uid, value is the sparse column cache
std::unordered_map<int32_t, PathToSparseColumnCacheUPtr> _variant_sparse_column_cache;
std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr> _variant_sparse_column_cache;
};

} // namespace segment_v2
Expand Down
Loading
Loading