Skip to content

Commit 2eb03c6

Browse files
authored
feat: enrich SQL bind errors with node span and near-sql context (#306)
* feat: enrich SQL bind errors with node span and near-sql context & add explain for where_by_index.slt and stream_distinct.slt * chore: error display * chore: codefmt * support multi-statement execution in run() and transaction run() * chore: codefmt * chore: disallow DDL in multi-statement run * fix: MDL lock lifetime for lazy DDL execution
1 parent c5986b7 commit 2eb03c6

38 files changed

+1653
-526
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ rust_decimal = { version = "1" }
5656
serde = { version = "1", features = ["derive", "rc"] }
5757
kite_sql_serde_macros = { version = "0.1.0", path = "kite_sql_serde_macros" }
5858
siphasher = { version = "1", features = ["serde"] }
59-
sqlparser = { version = "0.34", features = ["serde"] }
59+
sqlparser = { version = "0.61", features = ["serde"] }
6060
thiserror = { version = "1" }
6161
typetag = { version = "0.2" }
6262
ulid = { version = "1", features = ["serde"] }

src/binder/aggregate.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,18 +87,14 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
8787
let return_orderby = if !orderbys.is_empty() {
8888
let mut return_orderby = vec![];
8989
for orderby in orderbys {
90-
let OrderByExpr {
91-
expr,
92-
asc,
93-
nulls_first,
94-
} = orderby;
90+
let OrderByExpr { expr, options, .. } = orderby;
9591
let mut expr = self.bind_expr(expr)?;
9692
self.visit_column_agg_expr(&mut expr)?;
9793

9894
return_orderby.push(SortField::new(
9995
expr,
100-
asc.is_none_or(|asc| asc),
101-
nulls_first.unwrap_or(false),
96+
options.asc.is_none_or(|asc| asc),
97+
options.nulls_first.unwrap_or(false),
10298
));
10399
}
104100
Some(return_orderby)

src/binder/alter_table.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use sqlparser::ast::{AlterTableOperation, ObjectName};
1616

1717
use std::sync::Arc;
1818

19-
use super::{is_valid_identifier, Binder};
19+
use super::{attach_span_if_absent, is_valid_identifier, Binder};
2020
use crate::binder::lower_case_name;
2121
use crate::errors::DatabaseError;
2222
use crate::planner::operator::alter_table::add_column::AddColumnOperator;
@@ -43,13 +43,15 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
4343
column_keyword: _,
4444
if_not_exists,
4545
column_def,
46+
..
4647
} => {
4748
let plan = TableScanOperator::build(table_name.clone(), table, true)?;
4849
let column = self.bind_column(column_def, None)?;
4950

5051
if !is_valid_identifier(column.name()) {
51-
return Err(DatabaseError::InvalidColumn(
52-
"illegal column naming".to_string(),
52+
return Err(attach_span_if_absent(
53+
DatabaseError::invalid_column("illegal column naming".to_string()),
54+
column_def,
5355
));
5456
}
5557
LogicalPlan::new(
@@ -62,12 +64,17 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
6264
)
6365
}
6466
AlterTableOperation::DropColumn {
65-
column_name,
67+
column_names,
6668
if_exists,
6769
..
6870
} => {
6971
let plan = TableScanOperator::build(table_name.clone(), table, true)?;
70-
let column_name = column_name.value.clone();
72+
if column_names.len() != 1 {
73+
return Err(DatabaseError::UnsupportedStmt(
74+
"only dropping a single column is supported".to_string(),
75+
));
76+
}
77+
let column_name = column_names[0].value.clone();
7178

7279
LogicalPlan::new(
7380
Operator::DropColumn(DropColumnOperator {

src/binder/create_index.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,25 @@ use crate::planner::{Childrens, LogicalPlan};
2222
use crate::storage::Transaction;
2323
use crate::types::index::IndexType;
2424
use crate::types::value::DataValue;
25-
use sqlparser::ast::{ObjectName, OrderByExpr};
25+
use sqlparser::ast::{IndexColumn, ObjectName};
2626
use std::sync::Arc;
2727

2828
impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A> {
2929
pub(crate) fn bind_create_index(
3030
&mut self,
3131
table_name: &ObjectName,
32-
name: &ObjectName,
33-
exprs: &[OrderByExpr],
32+
name: Option<&ObjectName>,
33+
index_columns: &[IndexColumn],
3434
if_not_exists: bool,
3535
is_unique: bool,
3636
) -> Result<LogicalPlan, DatabaseError> {
3737
let table_name: Arc<str> = lower_case_name(table_name)?.into();
38-
let index_name = lower_case_name(name)?;
38+
let index_name = name
39+
.ok_or(DatabaseError::InvalidIndex)
40+
.and_then(lower_case_name)?;
3941
let ty = if is_unique {
4042
IndexType::Unique
41-
} else if exprs.len() == 1 {
43+
} else if index_columns.len() == 1 {
4244
IndexType::Normal
4345
} else {
4446
IndexType::Composite
@@ -52,11 +54,11 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
5254
Source::Table(table) => TableScanOperator::build(table_name.clone(), table, true)?,
5355
Source::View(view) => LogicalPlan::clone(&view.plan),
5456
};
55-
let mut columns = Vec::with_capacity(exprs.len());
57+
let mut columns = Vec::with_capacity(index_columns.len());
5658

57-
for expr in exprs {
59+
for index_column in index_columns {
5860
// TODO: Expression Index
59-
match self.bind_expr(&expr.expr)? {
61+
match self.bind_expr(&index_column.column.expr)? {
6062
ScalarExpression::ColumnRef { column, .. } => columns.push(column),
6163
expr => {
6264
return Err(DatabaseError::UnsupportedStmt(format!(

src/binder/create_table.rs

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use super::{is_valid_identifier, Binder};
15+
use super::{attach_span_if_absent, is_valid_identifier, Binder};
1616
use crate::binder::lower_case_name;
1717
use crate::catalog::{ColumnCatalog, ColumnDesc};
1818
use crate::errors::DatabaseError;
@@ -24,7 +24,7 @@ use crate::storage::Transaction;
2424
use crate::types::value::DataValue;
2525
use crate::types::LogicalType;
2626
use itertools::Itertools;
27-
use sqlparser::ast::{ColumnDef, ColumnOption, ObjectName, TableConstraint};
27+
use sqlparser::ast::{ColumnDef, ColumnOption, Expr, IndexColumn, ObjectName, TableConstraint};
2828
use std::collections::HashSet;
2929
use std::sync::Arc;
3030

@@ -40,8 +40,9 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
4040
let table_name: Arc<str> = lower_case_name(name)?.into();
4141

4242
if !is_valid_identifier(&table_name) {
43-
return Err(DatabaseError::InvalidTable(
44-
"illegal table naming".to_string(),
43+
return Err(attach_span_if_absent(
44+
DatabaseError::invalid_table("illegal table naming".to_string()),
45+
name,
4546
));
4647
}
4748
{
@@ -53,8 +54,9 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
5354
return Err(DatabaseError::DuplicateColumn(col_name.clone()));
5455
}
5556
if !is_valid_identifier(col_name) {
56-
return Err(DatabaseError::InvalidColumn(
57-
"illegal column naming".to_string(),
57+
return Err(attach_span_if_absent(
58+
DatabaseError::invalid_column("illegal column naming".to_string()),
59+
col,
5860
));
5961
}
6062
}
@@ -66,27 +68,15 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
6668
.try_collect()?;
6769
for constraint in constraints {
6870
match constraint {
69-
TableConstraint::Unique {
70-
columns: column_names,
71-
is_primary,
72-
..
73-
} => {
74-
for (i, column_name) in column_names
75-
.iter()
76-
.map(|ident| ident.value.to_lowercase())
77-
.enumerate()
78-
{
79-
if let Some(column) = columns
80-
.iter_mut()
81-
.find(|column| column.name() == column_name)
82-
{
83-
if *is_primary {
84-
column.desc_mut().set_primary(Some(i));
85-
} else {
86-
column.desc_mut().set_unique(true);
87-
}
88-
}
89-
}
71+
TableConstraint::PrimaryKey(primary) => {
72+
Self::bind_constraint(&mut columns, &primary.columns, |i, desc| {
73+
desc.set_primary(Some(i))
74+
})?;
75+
}
76+
TableConstraint::Unique(unique) => {
77+
Self::bind_constraint(&mut columns, &unique.columns, |_, desc| {
78+
desc.set_unique()
79+
})?;
9080
}
9181
constraint => {
9282
return Err(DatabaseError::UnsupportedStmt(format!(
@@ -97,8 +87,11 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
9787
}
9888

9989
if columns.iter().filter(|col| col.desc().is_primary()).count() == 0 {
100-
return Err(DatabaseError::InvalidTable(
101-
"the primary key field must exist and have at least one".to_string(),
90+
return Err(attach_span_if_absent(
91+
DatabaseError::invalid_table(
92+
"the primary key field must exist and have at least one".to_string(),
93+
),
94+
name,
10295
));
10396
}
10497

@@ -112,6 +105,29 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
112105
))
113106
}
114107

108+
fn bind_constraint<F: Fn(usize, &mut ColumnDesc)>(
109+
table_columns: &mut [ColumnCatalog],
110+
exprs: &[IndexColumn],
111+
fn_constraint: F,
112+
) -> Result<(), DatabaseError> {
113+
for (i, index_column) in exprs.iter().enumerate() {
114+
let Expr::Identifier(ident) = &index_column.column.expr else {
115+
return Err(DatabaseError::UnsupportedStmt(
116+
"only identifier columns are supported in `PRIMARY KEY/UNIQUE`".to_string(),
117+
));
118+
};
119+
let column_name = ident.value.to_lowercase();
120+
121+
if let Some(column) = table_columns
122+
.iter_mut()
123+
.find(|column| column.name() == column_name)
124+
{
125+
fn_constraint(i, column.desc_mut())
126+
}
127+
}
128+
Ok(())
129+
}
130+
115131
pub fn bind_column(
116132
&mut self,
117133
column_def: &ColumnDef,
@@ -130,16 +146,13 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
130146
match &option_def.option {
131147
ColumnOption::Null => nullable = true,
132148
ColumnOption::NotNull => nullable = false,
133-
ColumnOption::Unique { is_primary, .. } => {
134-
if *is_primary {
135-
column_desc.set_primary(column_index);
136-
nullable = false;
137-
// Skip other options when using primary key
138-
break;
139-
} else {
140-
column_desc.set_unique(true);
141-
}
149+
ColumnOption::PrimaryKey(_) => {
150+
column_desc.set_primary(column_index);
151+
nullable = false;
152+
// Skip other options when using primary key
153+
break;
142154
}
155+
ColumnOption::Unique(_) => column_desc.set_unique(),
143156
ColumnOption::Default(expr) => {
144157
let mut expr = self.bind_expr(expr)?;
145158

src/binder/create_view.rs

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::planner::{Childrens, LogicalPlan};
2323
use crate::storage::Transaction;
2424
use crate::types::value::DataValue;
2525
use itertools::Itertools;
26-
use sqlparser::ast::{Ident, ObjectName, Query};
26+
use sqlparser::ast::{ObjectName, Query, ViewColumnDef};
2727
use std::sync::Arc;
2828
use ulid::Ulid;
2929

@@ -32,41 +32,55 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
3232
&mut self,
3333
or_replace: &bool,
3434
name: &ObjectName,
35-
columns: &[Ident],
35+
columns: &[ViewColumnDef],
3636
query: &Query,
3737
) -> Result<LogicalPlan, DatabaseError> {
38+
fn projection_exprs(
39+
view_name: &Arc<str>,
40+
mapping_schema: &[ColumnRef],
41+
column_names: impl Iterator<Item = String>,
42+
) -> Vec<ScalarExpression> {
43+
column_names
44+
.enumerate()
45+
.map(|(i, column_name)| {
46+
let mapping_column = &mapping_schema[i];
47+
let mut column = ColumnCatalog::new(
48+
column_name,
49+
mapping_column.nullable(),
50+
mapping_column.desc().clone(),
51+
);
52+
column.set_ref_table(view_name.clone(), Ulid::new(), true);
53+
54+
ScalarExpression::Alias {
55+
expr: Box::new(ScalarExpression::column_expr(mapping_column.clone())),
56+
alias: AliasType::Expr(Box::new(ScalarExpression::column_expr(
57+
ColumnRef::from(column),
58+
))),
59+
}
60+
})
61+
.collect_vec()
62+
}
63+
3864
let view_name: Arc<str> = lower_case_name(name)?.into();
3965
let mut plan = self.bind_query(query)?;
4066

4167
let mapping_schema = plan.output_schema();
4268

43-
let exprs = if columns.is_empty() {
44-
Box::new(
69+
let exprs: Vec<ScalarExpression> = if columns.is_empty() {
70+
projection_exprs(
71+
&view_name,
72+
mapping_schema,
4573
mapping_schema
4674
.iter()
4775
.map(|column| column.name().to_string()),
48-
) as Box<dyn Iterator<Item = String>>
76+
)
4977
} else {
50-
Box::new(columns.iter().map(lower_ident)) as Box<dyn Iterator<Item = String>>
51-
}
52-
.enumerate()
53-
.map(|(i, column_name)| {
54-
let mapping_column = &mapping_schema[i];
55-
let mut column = ColumnCatalog::new(
56-
column_name,
57-
mapping_column.nullable(),
58-
mapping_column.desc().clone(),
59-
);
60-
column.set_ref_table(view_name.clone(), Ulid::new(), true);
61-
62-
ScalarExpression::Alias {
63-
expr: Box::new(ScalarExpression::column_expr(mapping_column.clone())),
64-
alias: AliasType::Expr(Box::new(ScalarExpression::column_expr(ColumnRef::from(
65-
column,
66-
)))),
67-
}
68-
})
69-
.collect_vec();
78+
projection_exprs(
79+
&view_name,
80+
mapping_schema,
81+
columns.iter().map(|column| lower_ident(&column.name)),
82+
)
83+
};
7084
plan = self.bind_project(plan, exprs)?;
7185

7286
Ok(LogicalPlan::new(

src/binder/delete.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl<T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'_, '_, T, A>
3535
let mut table_alias = None;
3636
let mut alias_idents = None;
3737

38-
if let Some(TableAlias { name, columns }) = alias {
38+
if let Some(TableAlias { name, columns, .. }) = alias {
3939
table_alias = Some(name.value.to_lowercase().into());
4040
alias_idents = Some(columns);
4141
}

0 commit comments

Comments
 (0)