1313// limitations under the License.
1414
1515use std:: collections:: HashMap ;
16+ use std:: sync:: Arc ;
1617
1718use databend_common_ast:: ast:: Expr ;
1819use databend_common_ast:: ast:: Identifier ;
@@ -21,59 +22,156 @@ use databend_common_exception::ErrorCode;
2122use databend_common_exception:: Result ;
2223use databend_common_expression:: Constant ;
2324use databend_common_expression:: ConstantFolder ;
25+ use databend_common_expression:: DataBlock ;
2426use databend_common_expression:: Scalar ;
2527use databend_common_functions:: BUILTIN_FUNCTIONS ;
2628
2729use crate :: ScalarBinder ;
2830use crate :: ScalarExpr ;
31+ use crate :: planner:: QueryExecutor ;
2932use crate :: plans:: ConstantExpr ;
3033
34+ /// Check if an AST expression contains a subquery
35+ fn contains_subquery ( expr : & Expr ) -> bool {
36+ match expr {
37+ Expr :: Subquery { .. } => true ,
38+ Expr :: InSubquery { .. } => true ,
39+ Expr :: Exists { .. } => true ,
40+ Expr :: Cast { expr, .. } => contains_subquery ( expr) ,
41+ Expr :: TryCast { expr, .. } => contains_subquery ( expr) ,
42+ Expr :: FunctionCall { func, .. } => func. args . iter ( ) . any ( contains_subquery) ,
43+ Expr :: BinaryOp { left, right, .. } => contains_subquery ( left) || contains_subquery ( right) ,
44+ Expr :: UnaryOp { expr, .. } => contains_subquery ( expr) ,
45+ Expr :: IsNull { expr, .. } => contains_subquery ( expr) ,
46+ Expr :: IsDistinctFrom { left, right, .. } => {
47+ contains_subquery ( left) || contains_subquery ( right)
48+ }
49+ Expr :: InList { expr, list, .. } => {
50+ contains_subquery ( expr) || list. iter ( ) . any ( contains_subquery)
51+ }
52+ Expr :: Between {
53+ expr, low, high, ..
54+ } => contains_subquery ( expr) || contains_subquery ( low) || contains_subquery ( high) ,
55+ Expr :: Case {
56+ operand,
57+ conditions,
58+ results,
59+ else_result,
60+ ..
61+ } => {
62+ operand. as_ref ( ) . is_some_and ( |e| contains_subquery ( e) )
63+ || conditions. iter ( ) . any ( contains_subquery)
64+ || results. iter ( ) . any ( contains_subquery)
65+ || else_result. as_ref ( ) . is_some_and ( |e| contains_subquery ( e) )
66+ }
67+ Expr :: MapAccess { expr, .. } => contains_subquery ( expr) ,
68+ Expr :: Array { exprs, .. } => exprs. iter ( ) . any ( contains_subquery) ,
69+ Expr :: Tuple { exprs, .. } => exprs. iter ( ) . any ( contains_subquery) ,
70+ _ => false ,
71+ }
72+ }
73+
74+ /// Execute a subquery and extract a single scalar value from the result
75+ fn execute_subquery_for_scalar (
76+ subquery_executor : & Arc < dyn QueryExecutor > ,
77+ expr : & Expr ,
78+ ) -> Result < Scalar > {
79+ let sql = expr. to_string ( ) ;
80+ let data_blocks = databend_common_base:: runtime:: block_on ( async {
81+ subquery_executor
82+ . execute_query_with_sql_string ( & format ! ( "SELECT {}" , sql) )
83+ . await
84+ } ) ?;
85+
86+ if data_blocks. is_empty ( ) {
87+ return Ok ( Scalar :: Null ) ;
88+ }
89+
90+ let block = DataBlock :: concat ( & data_blocks) ?;
91+
92+ if block. num_columns ( ) != 1 {
93+ return Err ( ErrorCode :: SemanticError (
94+ "Scalar subquery in table function argument must return exactly one column" . to_string ( ) ,
95+ ) ) ;
96+ }
97+
98+ if block. num_rows ( ) > 1 {
99+ return Err ( ErrorCode :: SemanticError (
100+ "Scalar subquery in table function argument returned more than one row" . to_string ( ) ,
101+ ) ) ;
102+ }
103+
104+ if block. num_rows ( ) == 0 {
105+ return Ok ( Scalar :: Null ) ;
106+ }
107+
108+ let column = & block. columns ( ) [ 0 ] ;
109+ let col_value = column. value ( ) ;
110+ Ok ( col_value. index ( 0 ) . unwrap ( ) . to_owned ( ) )
111+ }
112+
113+ /// Try to fold an expression to a constant scalar value.
114+ /// If constant folding fails and the expression contains a subquery,
115+ /// execute the subquery and return the result.
116+ fn try_fold_to_scalar (
117+ scalar : ScalarExpr ,
118+ ast_expr : & Expr ,
119+ scalar_binder : & ScalarBinder < ' _ > ,
120+ subquery_executor : & Option < Arc < dyn QueryExecutor > > ,
121+ ) -> Result < Scalar > {
122+ let expr = scalar. as_expr ( ) ?;
123+ let ( expr, _) = ConstantFolder :: fold ( & expr, & scalar_binder. get_func_ctx ( ) ?, & BUILTIN_FUNCTIONS ) ;
124+
125+ match expr. into_constant ( ) {
126+ Ok ( Constant { scalar, .. } ) => Ok ( scalar) ,
127+ Err ( _) => {
128+ if contains_subquery ( ast_expr) {
129+ if let Some ( executor) = subquery_executor {
130+ return execute_subquery_for_scalar ( executor, ast_expr) ;
131+ }
132+ return Err ( ErrorCode :: Internal (
133+ "Subquery executor is not available for evaluating table function arguments"
134+ . to_string ( ) ,
135+ ) ) ;
136+ }
137+ Err ( ErrorCode :: Unimplemented ( format ! (
138+ "Unsupported table argument type: {:?}" ,
139+ scalar
140+ ) ) )
141+ }
142+ }
143+ }
144+
31145pub fn bind_table_args (
32146 scalar_binder : & mut ScalarBinder < ' _ > ,
33147 params : & [ Expr ] ,
34148 named_params : & [ ( Identifier , Expr ) ] ,
149+ subquery_executor : & Option < Arc < dyn QueryExecutor > > ,
35150) -> Result < TableArgs > {
36151 let mut args = Vec :: with_capacity ( params. len ( ) ) ;
37152 for arg in params. iter ( ) {
38- args. push ( scalar_binder. bind ( arg) ?. 0 ) ;
153+ args. push ( ( scalar_binder. bind ( arg) ?. 0 , arg ) ) ;
39154 }
40155
41156 let mut named_args = Vec :: with_capacity ( named_params. len ( ) ) ;
42157 for ( name, arg) in named_params. iter ( ) {
43- named_args. push ( ( name. clone ( ) , scalar_binder. bind ( arg) ?. 0 ) ) ;
158+ named_args. push ( ( name. clone ( ) , scalar_binder. bind ( arg) ?. 0 , arg ) ) ;
44159 }
45160
46161 let positioned_args = args
47162 . into_iter ( )
48- . map ( |scalar| {
49- let expr = scalar. as_expr ( ) ?;
50- let ( expr, _) =
51- ConstantFolder :: fold ( & expr, & scalar_binder. get_func_ctx ( ) ?, & BUILTIN_FUNCTIONS ) ;
52- match expr. into_constant ( ) {
53- Ok ( Constant { scalar, .. } ) => Ok ( scalar) ,
54- Err ( _) => Err ( ErrorCode :: Unimplemented ( format ! (
55- "Unsupported table argument type: {:?}" ,
56- scalar
57- ) ) ) ,
58- }
163+ . map ( |( scalar, ast_expr) | {
164+ try_fold_to_scalar ( scalar, ast_expr, scalar_binder, subquery_executor)
59165 } )
60166 . collect :: < Result < Vec < _ > > > ( ) ?;
61167
62168 let named_args: HashMap < String , Scalar > = named_args
63169 . into_iter ( )
64- . map ( |( name, scalar) | match scalar {
170+ . map ( |( name, scalar, ast_expr ) | match scalar {
65171 ScalarExpr :: ConstantExpr ( ConstantExpr { value, .. } ) => Ok ( ( name. name . clone ( ) , value) ) ,
66172 _ => {
67- let expr = scalar. as_expr ( ) ?;
68- let ( expr, _) =
69- ConstantFolder :: fold ( & expr, & scalar_binder. get_func_ctx ( ) ?, & BUILTIN_FUNCTIONS ) ;
70- match expr. into_constant ( ) {
71- Ok ( Constant { scalar, .. } ) => Ok ( ( name. name . clone ( ) , scalar) ) ,
72- Err ( _) => Err ( ErrorCode :: Unimplemented ( format ! (
73- "Unsupported table argument type: {:?}" ,
74- scalar
75- ) ) ) ,
76- }
173+ let value = try_fold_to_scalar ( scalar, ast_expr, scalar_binder, subquery_executor) ?;
174+ Ok ( ( name. name . clone ( ) , value) )
77175 }
78176 } )
79177 . collect :: < Result < HashMap < _ , _ > > > ( ) ?;
0 commit comments