Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,13 @@ impl TableSchema {
Ok(i)
}

pub fn drop_column_unchecked(&mut self, column: &str) -> Result<FieldIndex> {
let i = self.index_of(column)?;
self.fields.remove(i);

Ok(i)
}

pub fn to_leaf_column_id_set(&self) -> HashSet<ColumnId> {
HashSet::from_iter(self.to_leaf_column_ids().iter().cloned())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Interpreter for DropTableInterpreter {
let engine = tbl.get_table_info().engine();
if matches!(engine, VIEW_ENGINE | STREAM_ENGINE) {
return Err(ErrorCode::TableEngineNotSupported(format!(
"{}.{} engine is {} that doesn't support drop, use `DROP {} {}.{}` instead",
"{}.{} engine is {}, use `DROP {} {}.{}` instead",
&self.plan.database,
&self.plan.table,
engine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,36 +152,46 @@ impl ModifyTableColumnInterpreter {
let table_info = table.get_table_info();
let mut new_schema = schema.clone();
let mut default_expr_binder = DefaultExprBinder::try_new(self.ctx.clone())?;
// Marks columns whose type change is compatible:
// either from STRING to BINARY in Parquet, or when the data type remains unchanged.
let mut skip_alter_type_flags = vec![false; field_and_comments.len()];
// first check default expr before lock table
for (field, _comment) in field_and_comments {
for (idx, (field, _comment)) in field_and_comments.iter().enumerate() {
if let Some((i, old_field)) = schema.column_with_name(&field.name) {
// if the field has different leaf column numbers, we need drop the old column
// and add a new one to generate new column id. otherwise, leaf column ids will conflict.
if old_field.data_type.num_leaf_columns() != field.data_type.num_leaf_columns() {
let _ = new_schema.drop_column(&field.name);
let is_alter_column_string_to_binary =
is_string_to_binary(&old_field.data_type, &field.data_type);
let skip_alter_type_flag = (table.storage_format_as_parquet()
&& is_alter_column_string_to_binary)
|| old_field.data_type == field.data_type;
skip_alter_type_flags[idx] = skip_alter_type_flag;
// If the field's data type has changed, we need to drop the old column
// and add a new one to regenerate its column ID. This ensures consistency
// between the schema definition and column identifiers.
if !skip_alter_type_flag {
let _ = new_schema.drop_column_unchecked(&field.name);
let _ = new_schema.add_column(field, i);

// Check if this column is referenced by computed columns.
let data_schema = DataSchema::from(&new_schema);
check_referenced_computed_columns(
self.ctx.clone(),
Arc::new(data_schema),
&field.name,
)?;
} else {
// new field don't have `column_id`, assign field directly will cause `column_id` lost.
new_schema.fields[i].data_type = field.data_type.clone();
// TODO: support set computed field.
new_schema.fields[i].computed_expr = field.computed_expr.clone();
}

if let Some(default_expr) = &field.default_expr {
let default_expr = default_expr.to_string();
new_schema.fields[i].default_expr = Some(default_expr);
let _ = default_expr_binder.get_scalar(&new_schema.fields[i])?;
} else {
new_schema.fields[i].default_expr = None;
}
if old_field.data_type != field.data_type {
// Check if this column is referenced by computed columns.
let data_schema = DataSchema::from(&new_schema);
check_referenced_computed_columns(
self.ctx.clone(),
Arc::new(data_schema),
&field.name,
)?;
}
} else {
return Err(ErrorCode::UnknownColumn(format!(
"Cannot find column {}",
Expand Down Expand Up @@ -256,26 +266,35 @@ impl ModifyTableColumnInterpreter {
return Ok(PipelineBuildResult::create());
}

if fuse_table.change_tracking_enabled() {
// Modifying columns while change tracking is active may break
// the consistency between tracked changes and the current table schema,
// leading to incorrect or incomplete change records.
log::warn!(
"table {} has change tracking enabled, modifying columns should be avoided",
table_info.desc
);
}

let mut modified_field_indices = HashSet::new();
let new_schema_without_computed_fields = new_schema.remove_computed_fields();
if schema != new_schema {
for (field, _) in field_and_comments {
let field_index = new_schema_without_computed_fields.index_of(&field.name)?;
for ((field, _), skip_alter_type_flag) in
field_and_comments.iter().zip(skip_alter_type_flags)
{
let old_field = schema.field_with_name(&field.name)?;
let is_alter_column_string_to_binary =
is_string_to_binary(&old_field.data_type, &field.data_type);
// If two conditions are met, we don't need rebuild the table,
// as rebuild table can be a time-consuming job.
// 1. alter column from string to binary in parquet or data type not changed.
// 2. default expr and computed expr not changed. Otherwise, we need fill value for
// new added column.
if ((table.storage_format_as_parquet() && is_alter_column_string_to_binary)
|| old_field.data_type == field.data_type)
if skip_alter_type_flag
&& old_field.default_expr == field.default_expr
&& old_field.computed_expr == field.computed_expr
{
continue;
}
let field_index = new_schema_without_computed_fields.index_of(&field.name)?;
modified_field_indices.insert(field_index);
}
table_info.meta.schema = new_schema.clone().into();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
## Copyright 2023 Databend Cloud
##
## Licensed under the Elastic License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## https://www.elastic.co/licensing/elastic-license
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.

statement ok
create or replace table t_18827(a int, b int);

query I
insert into t_18827 values(1, 1),(2, 2);
----
2

statement ok
create or replace stream s_18827 on table t_18827 append_only=false;

statement ok
create or replace stream s1_18827 on table t_18827 append_only=true;

statement ok
alter table t_18827 modify column b float64;

query T
select a, b, change$action, change$is_update from s1_18827;
----
1 1.0 INSERT 0
2 2.0 INSERT 0

query T
select a, b, change$action, change$is_update from s_18827 order by change$action, a;
----
1 NULL DELETE 0
2 NULL DELETE 0
1 1.0 INSERT 0
2 2.0 INSERT 0

statement ok
drop stream s_18827;

statement ok
drop stream s1_18827;

statement ok
drop table t_18827 all;
Loading