Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,16 @@ void run_drop_tablet_task(const std::shared_ptr<DropTabletAgentTaskRequest>& age
}

void run_create_tablet_task(const std::shared_ptr<CreateTabletAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
const auto& create_tablet_req = agent_task_req->task_req;
TCreateTabletReq create_tablet_req = agent_task_req->task_req;
TFinishTaskRequest finish_task_request;
TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;

Status preprocess_status = preprocess_default_expr_for_tcolumns(create_tablet_req.tablet_schema.columns);
if (!preprocess_status.ok()) {
LOG(WARNING) << "Failed to preprocess default_expr in CREATE TABLE: " << preprocess_status.to_string();
}

auto tablet_type = create_tablet_req.tablet_type;
Status create_status;

Expand Down Expand Up @@ -633,6 +638,12 @@ void run_update_schema_task(const std::shared_ptr<UpdateSchemaTaskRequest>& agen
for (auto uid : tcolumn_param.sort_key_uid) {
pcolumn_param.add_sort_key_uid(uid);
}

Status preprocess_status = preprocess_default_expr_for_tcolumns(tcolumn_param.columns);
if (!preprocess_status.ok()) {
LOG(WARNING) << "Failed to preprocess default_expr in UPDATE_SCHEMA task: " << preprocess_status;
}

Status st;
for (auto& tcolumn : tcolumn_param.columns) {
uint32_t col_unique_id = tcolumn.col_unique_id;
Expand Down
11 changes: 9 additions & 2 deletions be/src/exec/olap_meta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>

#include "exec/olap_meta_scan_node.h"
#include "storage/metadata_util.h"
#include "storage/storage_engine.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"
Expand Down Expand Up @@ -58,8 +59,14 @@ Status OlapMetaScanner::_init_meta_reader_params() {
if (_reader_params.tablet_schema == nullptr) {
if (_parent->_meta_scan_node.__isset.columns && !_parent->_meta_scan_node.columns.empty() &&
(_parent->_meta_scan_node.columns[0].col_unique_id >= 0)) {
_reader_params.tablet_schema =
TabletSchema::copy(*_tablet->tablet_schema(), _parent->_meta_scan_node.columns);
auto columns_copy = _parent->_meta_scan_node.columns;
Status preprocess_status = preprocess_default_expr_for_tcolumns(columns_copy);
if (!preprocess_status.ok()) {
LOG(WARNING) << "Failed to preprocess default_expr in OlapMetaScanner: "
<< preprocess_status.to_string();
}

_reader_params.tablet_schema = TabletSchema::copy(*_tablet->tablet_schema(), columns_copy);
} else {
_reader_params.tablet_schema = _tablet->tablet_schema();
}
Expand Down
11 changes: 8 additions & 3 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
#include "storage/chunk_helper.h"
#include "storage/column_predicate_rewriter.h"
#include "storage/index/vector/vector_search_option.h"
#include "storage/metadata_util.h"
#include "storage/predicate_parser.h"
#include "storage/projection_iterator.h"
#include "storage/runtime_range_pruner.hpp"
#include "storage/storage_engine.h"
#include "storage/tablet_index.h"
#include "types/logical_type.h"
#include "util/runtime_profile.h"
#include "util/table_metrics.h"
Expand Down Expand Up @@ -560,8 +560,13 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
if (_scan_node->thrift_olap_scan_node().__isset.columns_desc &&
!_scan_node->thrift_olap_scan_node().columns_desc.empty() &&
_scan_node->thrift_olap_scan_node().columns_desc[0].col_unique_id >= 0) {
_tablet_schema =
TabletSchema::copy(*_tablet->tablet_schema(), _scan_node->thrift_olap_scan_node().columns_desc);
auto columns_desc_copy = _scan_node->thrift_olap_scan_node().columns_desc;
Status preprocess_status = preprocess_default_expr_for_tcolumns(columns_desc_copy);
if (!preprocess_status.ok()) {
LOG(WARNING) << "Failed to preprocess default_expr in olap_chunk_source: "
<< preprocess_status.to_string();
}
_tablet_schema = TabletSchema::copy(*_tablet->tablet_schema(), columns_desc_copy);
} else {
_tablet_schema = _tablet->tablet_schema();
}
Expand Down
11 changes: 10 additions & 1 deletion be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "column/chunk.h"
#include "exprs/expr.h"
#include "runtime/mem_pool.h"
#include "storage/metadata_util.h"
#include "storage/tablet_schema.h"
#include "types/constexpr.h"
#include "util/string_parser.hpp"
Expand Down Expand Up @@ -169,7 +170,15 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema, RuntimeS

if (t_index.__isset.column_param) {
auto col_param = _obj_pool.add(new OlapTableColumnParam());
for (auto& tcolumn_desc : t_index.column_param.columns) {
std::vector<TColumn> columns_copy = t_index.column_param.columns;
Status preprocess_status = preprocess_default_expr_for_tcolumns(columns_copy);
if (!preprocess_status.ok()) {
LOG(WARNING) << "Failed to preprocess default_expr in OlapTableSchemaParam::init: "
<< preprocess_status.to_string();
columns_copy = t_index.column_param.columns;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error handling fallback code unreachable due to always-OK return

The preprocess_default_expr_for_tcolumns function always returns Status::OK() even when individual column conversions fail (it only logs errors internally). The call site in tablet_info.cpp checks for failure and has fallback logic to reset columns_copy to the original columns, but this fallback code is unreachable. When some columns fail preprocessing, the code proceeds with partially-preprocessed columns instead of falling back to the originals as apparently intended. This could lead to inconsistent column configurations where some columns have converted defaults and others don't.

Additional Locations (1)

Fix in Cursor Fix in Web


for (auto& tcolumn_desc : columns_copy) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
col_param->columns.emplace_back(tc);
Expand Down
3 changes: 1 addition & 2 deletions be/src/exprs/cast_expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ struct CastToString {

StatusOr<ColumnPtr> cast_nested_to_json(const ColumnPtr& column, bool allow_throw_exception);

// cast column[idx] to coresponding json type.
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx);
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx, bool unindexed_struct = false);

} // namespace starrocks
47 changes: 28 additions & 19 deletions be/src/exprs/cast_expr_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "column/array_column.h"
#include "column/column_builder.h"
#include "column/column_viewer.h"
#include "column/column_visitor_adapter.h"
#include "column/json_column.h"
#include "column/map_column.h"
Expand All @@ -37,11 +36,16 @@ constexpr bool is_type_complete_v<T, std::void_t<decltype(sizeof(T))>> = true;
// NOTE: cast in rowwise is not efficent but intuitive
class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVisitor> {
public:
CastColumnItemVisitor(int row, const std::string& field_name, vpack::Builder* builder)
: ColumnVisitorAdapter(this), _row(row), _field_name(field_name), _builder(builder) {}

static Status cast_datum_to_json(const ColumnPtr& col, int row, const std::string& name, vpack::Builder* builder) {
CastColumnItemVisitor visitor(row, name, builder);
CastColumnItemVisitor(int row, const std::string& field_name, vpack::Builder* builder, bool unindexed_struct)
: ColumnVisitorAdapter(this),
_row(row),
_field_name(field_name),
_builder(builder),
_unindexed_struct(unindexed_struct) {}

static Status cast_datum_to_json(const ColumnPtr& col, int row, const std::string& name, vpack::Builder* builder,
bool unindexed_struct = false) {
CastColumnItemVisitor visitor(row, name, builder, unindexed_struct);
try {
return col->accept(&visitor);
} catch (const arangodb::velocypack::Exception& e) {
Expand Down Expand Up @@ -111,17 +115,19 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
}

Status do_visit(const StructColumn& col) {
// Use indexed or unindexed object based on _unindexed_struct flag
// unindexed=true preserves field insertion order (crucial for default values)
if (_field_name.empty()) {
_builder->openObject();
_builder->openObject(_unindexed_struct);
} else {
_builder->add(_field_name, vpack::Value(vpack::ValueType::Object));
_builder->add(_field_name, vpack::Value(vpack::ValueType::Object, _unindexed_struct));
}
const auto& names = col.field_names();
const auto columns = col.fields();
for (int i = 0; i < columns.size(); i++) {
auto name = names.size() > i ? names[i] : fmt::format("k{}", i);
auto& field_column = columns[i];
RETURN_IF_ERROR(cast_datum_to_json(field_column, _row, name, _builder));
RETURN_IF_ERROR(cast_datum_to_json(field_column, _row, name, _builder, _unindexed_struct));
}
if (!_builder->isClosed()) {
_builder->close();
Expand All @@ -132,9 +138,9 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi

Status do_visit(const MapColumn& col) {
if (_field_name.empty()) {
_builder->openObject();
_builder->openObject(_unindexed_struct);
} else {
_builder->add(_field_name, vpack::Value(vpack::ValueType::Object));
_builder->add(_field_name, vpack::Value(vpack::ValueType::Object, _unindexed_struct));
}
auto [map_start, map_size] = col.get_map_offset_size(_row);
const auto& val_col = col.values_column();
Expand Down Expand Up @@ -165,7 +171,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
continue;
}
// VLOG(2) << "map key " << i << ": " << key_col->debug_item(i) << " , name=" << name;
RETURN_IF_ERROR(cast_datum_to_json(val_col, i, name, _builder));
RETURN_IF_ERROR(cast_datum_to_json(val_col, i, name, _builder, _unindexed_struct));
}

if (!_builder->isClosed()) {
Expand All @@ -184,7 +190,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
auto [offset, size] = col.get_element_offset_size(_row);
const auto& elements = col.elements_column();
for (int i = offset; i < offset + size; i++) {
RETURN_IF_ERROR(cast_datum_to_json(elements, i, "", _builder));
RETURN_IF_ERROR(cast_datum_to_json(elements, i, "", _builder, _unindexed_struct));
}

if (!_builder->isClosed()) {
Expand All @@ -198,7 +204,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
if (col.is_null(_row)) {
_add_element(vpack::ValueType::Null);
} else {
RETURN_IF_ERROR(cast_datum_to_json(col.data_column(), _row, _field_name, _builder));
RETURN_IF_ERROR(cast_datum_to_json(col.data_column(), _row, _field_name, _builder, _unindexed_struct));
}
return {};
}
Expand All @@ -219,6 +225,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
int _row;
const std::string& _field_name;
vpack::Builder* _builder;
bool _unindexed_struct;
};

// Cast nested type(including struct/map/* to json)
Expand Down Expand Up @@ -258,13 +265,15 @@ StatusOr<ColumnPtr> cast_nested_to_json(const ColumnPtr& column, bool allow_thro
return column_builder.build(false);
}

StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx) {
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx, bool unindexed_struct) {
vpack::Builder json_builder;
json_builder.clear();
RETURN_IF_ERROR(CastColumnItemVisitor::cast_datum_to_json(column, idx, "", &json_builder));
JsonValue json(json_builder.slice());
RETURN_IF_ERROR(CastColumnItemVisitor::cast_datum_to_json(column, idx, "", &json_builder, unindexed_struct));

return json.to_string();
auto slice = json_builder.slice();
JsonValue json(slice);
auto result = json.to_string();
return result;
}

} // namespace starrocks
} // namespace starrocks
9 changes: 8 additions & 1 deletion be/src/storage/lake/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,14 @@ Status SchemaChangeHandler::do_process_alter_tablet(const TAlterTabletReqV2& req

TabletSchemaCSPtr base_schema;
if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
base_schema = TabletSchema::copy(*(base_tablet.get_schema()), request.columns);
auto columns_copy = request.columns;

Status preprocess_status = preprocess_default_expr_for_tcolumns(columns_copy);
if (!preprocess_status.ok()) {
LOG(WARNING) << "Failed to preprocess default_expr in Lake SchemaChange: " << preprocess_status.to_string();
}

base_schema = TabletSchema::copy(*(base_tablet.get_schema()), columns_copy);
} else {
base_schema = base_tablet.get_schema();
}
Expand Down
94 changes: 94 additions & 0 deletions be/src/storage/metadata_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,20 @@
#include "storage/metadata_util.h"

#include <boost/algorithm/string.hpp>
#include <chrono>
#include <ctime>

#include "column/column_helper.h"
#include "common/config.h"
#include "common/object_pool.h"
#include "exprs/cast_expr.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "gen_cpp/AgentService_types.h"
#include "gen_cpp/Types_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "storage/aggregate_type.h"
#include "storage/olap_common.h"
#include "storage/tablet_schema.h"
Expand Down Expand Up @@ -418,4 +428,88 @@ Status convert_t_schema_to_pb_schema(const TTabletSchema& t_schema, TabletSchema
return convert_t_schema_to_pb_schema(t_schema, compression_type, out_schema);
}

// Helper function to create a minimal RuntimeState for constant expression evaluation
static std::unique_ptr<RuntimeState> create_temp_runtime_state() {
TUniqueId dummy_query_id;
dummy_query_id.hi = 0;
dummy_query_id.lo = 0;

TQueryOptions query_options;
TQueryGlobals query_globals;
auto now = std::chrono::system_clock::now();
auto now_time_t = std::chrono::system_clock::to_time_t(now);
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();

std::tm tm_buf;
localtime_r(&now_time_t, &tm_buf);
char time_str[64];
strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", &tm_buf);

query_globals.now_string = time_str;
query_globals.timestamp_ms = now_ms;
query_globals.time_zone = "UTC";

auto state = std::make_unique<RuntimeState>(dummy_query_id, query_options, query_globals, ExecEnv::GetInstance());
state->init_instance_mem_tracker();

return state;
}

StatusOr<std::string> convert_default_expr_to_json_string(const TExpr& t_expr) {
auto state = create_temp_runtime_state();
ObjectPool pool;

ExprContext* ctx = nullptr;
RETURN_IF_ERROR(Expr::create_expr_tree(&pool, t_expr, &ctx, state.get()));

if (ctx == nullptr || ctx->root() == nullptr) {
return Status::InternalError("Failed to create expression tree from TExpr");
}

RETURN_IF_ERROR(ctx->prepare(state.get()));
RETURN_IF_ERROR(ctx->open(state.get()));

ColumnPtr column;
auto eval_result = ctx->root()->evaluate_const(ctx);
if (!eval_result.ok()) {
ctx->close(state.get());
return eval_result.status();
}
column = eval_result.value();

if (column == nullptr || column->size() == 0) {
ctx->close(state.get());
return Status::InternalError("Failed to evaluate default expression: empty result");
}

if (column->is_constant()) {
auto const_col = down_cast<const ConstColumn*>(column.get());
column = const_col->data_column()->clone();
}

DCHECK_EQ(column->size(), 1) << "Default constant expression should produce exactly one value";

ASSIGN_OR_RETURN(auto json_str, cast_type_to_json_str(column, 0, true));
ctx->close(state.get());

return json_str;
}

Status preprocess_default_expr_for_tcolumns(std::vector<TColumn>& columns) {
for (auto& column : columns) {
if (column.__isset.default_expr) {
auto result = convert_default_expr_to_json_string(column.default_expr);
if (result.ok()) {
column.default_value = result.value();
column.__isset.default_value = true;
} else {
LOG(ERROR) << "Failed to convert default_expr to JSON String for column '" << column.column_name
<< "': " << result.status().to_string();
}
}
}

return Status::OK();
}

} // namespace starrocks
5 changes: 4 additions & 1 deletion be/src/storage/metadata_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "common/status.h"
#include "gen_cpp/Types_types.h"
#include "olap_common.h"
#include "types/logical_type.h"

namespace starrocks {

Expand Down Expand Up @@ -58,4 +57,8 @@ void convert_to_new_version(TColumn* tcolumn);

Status t_column_to_pb_column(int32_t unique_id, const TColumn& t_column, ColumnPB* column_pb);

StatusOr<std::string> convert_default_expr_to_json_string(const TExpr& t_expr);

Status preprocess_default_expr_for_tcolumns(std::vector<TColumn>& columns);

} // namespace starrocks
Loading
Loading