Skip to content

Commit cc8b0a8

Browse files
[Enhancement] support default value for array/map/struct type column
Signed-off-by: stephen <[email protected]>
1 parent c5cb4c5 commit cc8b0a8

30 files changed

+3666
-71
lines changed

be/src/agent/agent_task.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,16 @@ void run_drop_tablet_task(const std::shared_ptr<DropTabletAgentTaskRequest>& age
218218
}
219219

220220
void run_create_tablet_task(const std::shared_ptr<CreateTabletAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
221-
const auto& create_tablet_req = agent_task_req->task_req;
221+
TCreateTabletReq create_tablet_req = agent_task_req->task_req;
222222
TFinishTaskRequest finish_task_request;
223223
TStatusCode::type status_code = TStatusCode::OK;
224224
std::vector<std::string> error_msgs;
225225

226+
Status preprocess_status = preprocess_default_expr_for_tcolumns(create_tablet_req.tablet_schema.columns);
227+
if (!preprocess_status.ok()) {
228+
LOG(WARNING) << "Failed to preprocess default_expr in CREATE TABLE: " << preprocess_status.to_string();
229+
}
230+
226231
auto tablet_type = create_tablet_req.tablet_type;
227232
Status create_status;
228233

@@ -633,6 +638,12 @@ void run_update_schema_task(const std::shared_ptr<UpdateSchemaTaskRequest>& agen
633638
for (auto uid : tcolumn_param.sort_key_uid) {
634639
pcolumn_param.add_sort_key_uid(uid);
635640
}
641+
642+
Status preprocess_status = preprocess_default_expr_for_tcolumns(tcolumn_param.columns);
643+
if (!preprocess_status.ok()) {
644+
LOG(WARNING) << "Failed to preprocess default_expr in UPDATE_SCHEMA task: " << preprocess_status;
645+
}
646+
636647
Status st;
637648
for (auto& tcolumn : tcolumn_param.columns) {
638649
uint32_t col_unique_id = tcolumn.col_unique_id;

be/src/exec/olap_meta_scanner.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <memory>
1818

1919
#include "exec/olap_meta_scan_node.h"
20+
#include "storage/metadata_util.h"
2021
#include "storage/storage_engine.h"
2122
#include "storage/tablet.h"
2223
#include "storage/tablet_manager.h"
@@ -58,8 +59,14 @@ Status OlapMetaScanner::_init_meta_reader_params() {
5859
if (_reader_params.tablet_schema == nullptr) {
5960
if (_parent->_meta_scan_node.__isset.columns && !_parent->_meta_scan_node.columns.empty() &&
6061
(_parent->_meta_scan_node.columns[0].col_unique_id >= 0)) {
61-
_reader_params.tablet_schema =
62-
TabletSchema::copy(*_tablet->tablet_schema(), _parent->_meta_scan_node.columns);
62+
auto columns_copy = _parent->_meta_scan_node.columns;
63+
Status preprocess_status = preprocess_default_expr_for_tcolumns(columns_copy);
64+
if (!preprocess_status.ok()) {
65+
LOG(WARNING) << "Failed to preprocess default_expr in OlapMetaScanner: "
66+
<< preprocess_status.to_string();
67+
}
68+
69+
_reader_params.tablet_schema = TabletSchema::copy(*_tablet->tablet_schema(), columns_copy);
6370
} else {
6471
_reader_params.tablet_schema = _tablet->tablet_schema();
6572
}

be/src/exec/pipeline/scan/olap_chunk_source.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@
4141
#include "storage/chunk_helper.h"
4242
#include "storage/column_predicate_rewriter.h"
4343
#include "storage/index/vector/vector_search_option.h"
44+
#include "storage/metadata_util.h"
4445
#include "storage/predicate_parser.h"
4546
#include "storage/projection_iterator.h"
4647
#include "storage/runtime_range_pruner.hpp"
4748
#include "storage/storage_engine.h"
48-
#include "storage/tablet_index.h"
4949
#include "types/logical_type.h"
5050
#include "util/runtime_profile.h"
5151
#include "util/table_metrics.h"
@@ -560,8 +560,13 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
560560
if (_scan_node->thrift_olap_scan_node().__isset.columns_desc &&
561561
!_scan_node->thrift_olap_scan_node().columns_desc.empty() &&
562562
_scan_node->thrift_olap_scan_node().columns_desc[0].col_unique_id >= 0) {
563-
_tablet_schema =
564-
TabletSchema::copy(*_tablet->tablet_schema(), _scan_node->thrift_olap_scan_node().columns_desc);
563+
auto columns_desc_copy = _scan_node->thrift_olap_scan_node().columns_desc;
564+
Status preprocess_status = preprocess_default_expr_for_tcolumns(columns_desc_copy);
565+
if (!preprocess_status.ok()) {
566+
LOG(WARNING) << "Failed to preprocess default_expr in olap_chunk_source: "
567+
<< preprocess_status.to_string();
568+
}
569+
_tablet_schema = TabletSchema::copy(*_tablet->tablet_schema(), columns_desc_copy);
565570
} else {
566571
_tablet_schema = _tablet->tablet_schema();
567572
}

be/src/exec/tablet_info.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "column/chunk.h"
1919
#include "exprs/expr.h"
2020
#include "runtime/mem_pool.h"
21+
#include "storage/metadata_util.h"
2122
#include "storage/tablet_schema.h"
2223
#include "types/constexpr.h"
2324
#include "util/string_parser.hpp"
@@ -169,7 +170,15 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema, RuntimeS
169170

170171
if (t_index.__isset.column_param) {
171172
auto col_param = _obj_pool.add(new OlapTableColumnParam());
172-
for (auto& tcolumn_desc : t_index.column_param.columns) {
173+
std::vector<TColumn> columns_copy = t_index.column_param.columns;
174+
Status preprocess_status = preprocess_default_expr_for_tcolumns(columns_copy);
175+
if (!preprocess_status.ok()) {
176+
LOG(WARNING) << "Failed to preprocess default_expr in OlapTableSchemaParam::init: "
177+
<< preprocess_status.to_string();
178+
columns_copy = t_index.column_param.columns;
179+
}
180+
181+
for (auto& tcolumn_desc : columns_copy) {
173182
TabletColumn* tc = _obj_pool.add(new TabletColumn());
174183
tc->init_from_thrift(tcolumn_desc);
175184
col_param->columns.emplace_back(tc);

be/src/exprs/cast_expr.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ struct CastToString {
308308

309309
StatusOr<ColumnPtr> cast_nested_to_json(const ColumnPtr& column, bool allow_throw_exception);
310310

311-
// cast column[idx] to coresponding json type.
312-
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx);
311+
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx, bool unindexed_struct = false);
313312

314313
} // namespace starrocks

be/src/exprs/cast_expr_json.cpp

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
#include "column/array_column.h"
1818
#include "column/column_builder.h"
19-
#include "column/column_viewer.h"
2019
#include "column/column_visitor_adapter.h"
2120
#include "column/json_column.h"
2221
#include "column/map_column.h"
@@ -37,11 +36,16 @@ constexpr bool is_type_complete_v<T, std::void_t<decltype(sizeof(T))>> = true;
3736
// NOTE: cast in rowwise is not efficent but intuitive
3837
class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVisitor> {
3938
public:
40-
CastColumnItemVisitor(int row, const std::string& field_name, vpack::Builder* builder)
41-
: ColumnVisitorAdapter(this), _row(row), _field_name(field_name), _builder(builder) {}
42-
43-
static Status cast_datum_to_json(const ColumnPtr& col, int row, const std::string& name, vpack::Builder* builder) {
44-
CastColumnItemVisitor visitor(row, name, builder);
39+
CastColumnItemVisitor(int row, const std::string& field_name, vpack::Builder* builder, bool unindexed_struct)
40+
: ColumnVisitorAdapter(this),
41+
_row(row),
42+
_field_name(field_name),
43+
_builder(builder),
44+
_unindexed_struct(unindexed_struct) {}
45+
46+
static Status cast_datum_to_json(const ColumnPtr& col, int row, const std::string& name, vpack::Builder* builder,
47+
bool unindexed_struct = false) {
48+
CastColumnItemVisitor visitor(row, name, builder, unindexed_struct);
4549
try {
4650
return col->accept(&visitor);
4751
} catch (const arangodb::velocypack::Exception& e) {
@@ -111,17 +115,19 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
111115
}
112116

113117
Status do_visit(const StructColumn& col) {
118+
// Use indexed or unindexed object based on _unindexed_struct flag
119+
// unindexed=true preserves field insertion order (crucial for default values)
114120
if (_field_name.empty()) {
115-
_builder->openObject();
121+
_builder->openObject(_unindexed_struct);
116122
} else {
117-
_builder->add(_field_name, vpack::Value(vpack::ValueType::Object));
123+
_builder->add(_field_name, vpack::Value(vpack::ValueType::Object, _unindexed_struct));
118124
}
119125
const auto& names = col.field_names();
120126
const auto columns = col.fields();
121127
for (int i = 0; i < columns.size(); i++) {
122128
auto name = names.size() > i ? names[i] : fmt::format("k{}", i);
123129
auto& field_column = columns[i];
124-
RETURN_IF_ERROR(cast_datum_to_json(field_column, _row, name, _builder));
130+
RETURN_IF_ERROR(cast_datum_to_json(field_column, _row, name, _builder, _unindexed_struct));
125131
}
126132
if (!_builder->isClosed()) {
127133
_builder->close();
@@ -132,9 +138,9 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
132138

133139
Status do_visit(const MapColumn& col) {
134140
if (_field_name.empty()) {
135-
_builder->openObject();
141+
_builder->openObject(_unindexed_struct);
136142
} else {
137-
_builder->add(_field_name, vpack::Value(vpack::ValueType::Object));
143+
_builder->add(_field_name, vpack::Value(vpack::ValueType::Object, _unindexed_struct));
138144
}
139145
auto [map_start, map_size] = col.get_map_offset_size(_row);
140146
const auto& val_col = col.values_column();
@@ -165,7 +171,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
165171
continue;
166172
}
167173
// VLOG(2) << "map key " << i << ": " << key_col->debug_item(i) << " , name=" << name;
168-
RETURN_IF_ERROR(cast_datum_to_json(val_col, i, name, _builder));
174+
RETURN_IF_ERROR(cast_datum_to_json(val_col, i, name, _builder, _unindexed_struct));
169175
}
170176

171177
if (!_builder->isClosed()) {
@@ -184,7 +190,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
184190
auto [offset, size] = col.get_element_offset_size(_row);
185191
const auto& elements = col.elements_column();
186192
for (int i = offset; i < offset + size; i++) {
187-
RETURN_IF_ERROR(cast_datum_to_json(elements, i, "", _builder));
193+
RETURN_IF_ERROR(cast_datum_to_json(elements, i, "", _builder, _unindexed_struct));
188194
}
189195

190196
if (!_builder->isClosed()) {
@@ -198,7 +204,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
198204
if (col.is_null(_row)) {
199205
_add_element(vpack::ValueType::Null);
200206
} else {
201-
RETURN_IF_ERROR(cast_datum_to_json(col.data_column(), _row, _field_name, _builder));
207+
RETURN_IF_ERROR(cast_datum_to_json(col.data_column(), _row, _field_name, _builder, _unindexed_struct));
202208
}
203209
return {};
204210
}
@@ -219,6 +225,7 @@ class CastColumnItemVisitor final : public ColumnVisitorAdapter<CastColumnItemVi
219225
int _row;
220226
const std::string& _field_name;
221227
vpack::Builder* _builder;
228+
bool _unindexed_struct;
222229
};
223230

224231
// Cast nested type(including struct/map/* to json)
@@ -258,13 +265,15 @@ StatusOr<ColumnPtr> cast_nested_to_json(const ColumnPtr& column, bool allow_thro
258265
return column_builder.build(false);
259266
}
260267

261-
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx) {
268+
StatusOr<std::string> cast_type_to_json_str(const ColumnPtr& column, int idx, bool unindexed_struct) {
262269
vpack::Builder json_builder;
263270
json_builder.clear();
264-
RETURN_IF_ERROR(CastColumnItemVisitor::cast_datum_to_json(column, idx, "", &json_builder));
265-
JsonValue json(json_builder.slice());
271+
RETURN_IF_ERROR(CastColumnItemVisitor::cast_datum_to_json(column, idx, "", &json_builder, unindexed_struct));
266272

267-
return json.to_string();
273+
auto slice = json_builder.slice();
274+
JsonValue json(slice);
275+
auto result = json.to_string();
276+
return result;
268277
}
269278

270-
} // namespace starrocks
279+
} // namespace starrocks

be/src/storage/lake/schema_change.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,14 @@ Status SchemaChangeHandler::do_process_alter_tablet(const TAlterTabletReqV2& req
336336

337337
TabletSchemaCSPtr base_schema;
338338
if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
339-
base_schema = TabletSchema::copy(*(base_tablet.get_schema()), request.columns);
339+
auto columns_copy = request.columns;
340+
341+
Status preprocess_status = preprocess_default_expr_for_tcolumns(columns_copy);
342+
if (!preprocess_status.ok()) {
343+
LOG(WARNING) << "Failed to preprocess default_expr in Lake SchemaChange: " << preprocess_status.to_string();
344+
}
345+
346+
base_schema = TabletSchema::copy(*(base_tablet.get_schema()), columns_copy);
340347
} else {
341348
base_schema = base_tablet.get_schema();
342349
}

be/src/storage/metadata_util.cpp

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,20 @@
1515
#include "storage/metadata_util.h"
1616

1717
#include <boost/algorithm/string.hpp>
18+
#include <chrono>
19+
#include <ctime>
1820

21+
#include "column/column_helper.h"
1922
#include "common/config.h"
23+
#include "common/object_pool.h"
24+
#include "exprs/cast_expr.h"
25+
#include "exprs/expr.h"
26+
#include "exprs/expr_context.h"
2027
#include "gen_cpp/AgentService_types.h"
28+
#include "gen_cpp/Types_types.h"
2129
#include "gutil/strings/substitute.h"
30+
#include "runtime/exec_env.h"
31+
#include "runtime/runtime_state.h"
2232
#include "storage/aggregate_type.h"
2333
#include "storage/olap_common.h"
2434
#include "storage/tablet_schema.h"
@@ -418,4 +428,88 @@ Status convert_t_schema_to_pb_schema(const TTabletSchema& t_schema, TabletSchema
418428
return convert_t_schema_to_pb_schema(t_schema, compression_type, out_schema);
419429
}
420430

431+
// Helper function to create a minimal RuntimeState for constant expression evaluation
432+
static std::unique_ptr<RuntimeState> create_temp_runtime_state() {
433+
TUniqueId dummy_query_id;
434+
dummy_query_id.hi = 0;
435+
dummy_query_id.lo = 0;
436+
437+
TQueryOptions query_options;
438+
TQueryGlobals query_globals;
439+
auto now = std::chrono::system_clock::now();
440+
auto now_time_t = std::chrono::system_clock::to_time_t(now);
441+
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
442+
443+
std::tm tm_buf;
444+
localtime_r(&now_time_t, &tm_buf);
445+
char time_str[64];
446+
strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", &tm_buf);
447+
448+
query_globals.now_string = time_str;
449+
query_globals.timestamp_ms = now_ms;
450+
query_globals.time_zone = "UTC";
451+
452+
auto state = std::make_unique<RuntimeState>(dummy_query_id, query_options, query_globals, ExecEnv::GetInstance());
453+
state->init_instance_mem_tracker();
454+
455+
return state;
456+
}
457+
458+
StatusOr<std::string> convert_default_expr_to_json_string(const TExpr& t_expr) {
459+
auto state = create_temp_runtime_state();
460+
ObjectPool pool;
461+
462+
ExprContext* ctx = nullptr;
463+
RETURN_IF_ERROR(Expr::create_expr_tree(&pool, t_expr, &ctx, state.get()));
464+
465+
if (ctx == nullptr || ctx->root() == nullptr) {
466+
return Status::InternalError("Failed to create expression tree from TExpr");
467+
}
468+
469+
RETURN_IF_ERROR(ctx->prepare(state.get()));
470+
RETURN_IF_ERROR(ctx->open(state.get()));
471+
472+
ColumnPtr column;
473+
auto eval_result = ctx->root()->evaluate_const(ctx);
474+
if (!eval_result.ok()) {
475+
ctx->close(state.get());
476+
return eval_result.status();
477+
}
478+
column = eval_result.value();
479+
480+
if (column == nullptr || column->size() == 0) {
481+
ctx->close(state.get());
482+
return Status::InternalError("Failed to evaluate default expression: empty result");
483+
}
484+
485+
if (column->is_constant()) {
486+
auto const_col = down_cast<const ConstColumn*>(column.get());
487+
column = const_col->data_column()->clone();
488+
}
489+
490+
DCHECK_EQ(column->size(), 1) << "Default constant expression should produce exactly one value";
491+
492+
ASSIGN_OR_RETURN(auto json_str, cast_type_to_json_str(column, 0, true));
493+
ctx->close(state.get());
494+
495+
return json_str;
496+
}
497+
498+
Status preprocess_default_expr_for_tcolumns(std::vector<TColumn>& columns) {
499+
for (auto& column : columns) {
500+
if (column.__isset.default_expr) {
501+
auto result = convert_default_expr_to_json_string(column.default_expr);
502+
if (result.ok()) {
503+
column.default_value = result.value();
504+
column.__isset.default_value = true;
505+
} else {
506+
LOG(ERROR) << "Failed to convert default_expr to JSON String for column '" << column.column_name
507+
<< "': " << result.status().to_string();
508+
}
509+
}
510+
}
511+
512+
return Status::OK();
513+
}
514+
421515
} // namespace starrocks

be/src/storage/metadata_util.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include "common/status.h"
2323
#include "gen_cpp/Types_types.h"
2424
#include "olap_common.h"
25-
#include "types/logical_type.h"
2625

2726
namespace starrocks {
2827

@@ -58,4 +57,8 @@ void convert_to_new_version(TColumn* tcolumn);
5857

5958
Status t_column_to_pb_column(int32_t unique_id, const TColumn& t_column, ColumnPB* column_pb);
6059

60+
StatusOr<std::string> convert_default_expr_to_json_string(const TExpr& t_expr);
61+
62+
Status preprocess_default_expr_for_tcolumns(std::vector<TColumn>& columns);
63+
6164
} // namespace starrocks

0 commit comments

Comments
 (0)