4646import io .confluent .ksql .parser .tree .AssertValues ;
4747import io .confluent .ksql .parser .tree .CreateAsSelect ;
4848import io .confluent .ksql .parser .tree .CreateSource ;
49+ import io .confluent .ksql .parser .tree .DefineVariable ;
4950import io .confluent .ksql .parser .tree .DropStatement ;
5051import io .confluent .ksql .parser .tree .InsertValues ;
5152import io .confluent .ksql .parser .tree .SetProperty ;
53+ import io .confluent .ksql .parser .tree .UndefineVariable ;
5254import io .confluent .ksql .parser .tree .UnsetProperty ;
5355import io .confluent .ksql .properties .PropertyOverrider ;
5456import io .confluent .ksql .query .QueryId ;
@@ -124,6 +126,7 @@ public class SqlTestExecutor implements Closeable {
124126 private KafkaTopicClient topicClient ;
125127 private Path tmpFolder ;
126128 private final Map <String , Object > overrides ;
129+ private final Map <String , String > variables ;
127130 private final Map <QueryId , DriverAndProperties > drivers ;
128131
129132 // populated during execution to handle the expected exception
@@ -204,6 +207,7 @@ public void onDeregister(final QueryMetadata query) {
204207 this .formatInjector = new DefaultFormatInjector ();
205208 this .topicClient = requireNonNull (topicClient , "topicClient" );
206209 this .overrides = new HashMap <>();
210+ this .variables = new HashMap <>();
207211 this .drivers = drivers ;
208212 this .tmpFolder = requireNonNull (tmpFolder , "tmpFolder" );
209213 }
@@ -242,7 +246,7 @@ private void doAssert(final AssertStatement statement) {
242246 }
243247
244248 private void execute (final ParsedStatement parsedStatement ) {
245- final PreparedStatement <?> engineStatement = engine .prepare (parsedStatement );
249+ final PreparedStatement <?> engineStatement = engine .prepare (parsedStatement , variables );
246250 final ConfiguredStatement <?> configured = ConfiguredStatement
247251 .of (engineStatement , SessionConfig .of (config , overrides ));
248252
@@ -257,6 +261,12 @@ private void execute(final ParsedStatement parsedStatement) {
257261 } else if (engineStatement .getStatement () instanceof UnsetProperty ) {
258262 PropertyOverrider .unset ((ConfiguredStatement <UnsetProperty >) configured , overrides );
259263 return ;
264+ } else if (engineStatement .getStatement () instanceof DefineVariable variableStatement ) {
265+ variables .put (variableStatement .getVariableName (), variableStatement .getVariableValue ());
266+ return ;
267+ } else if (engineStatement .getStatement () instanceof UndefineVariable variableStatement ) {
268+ variables .remove (variableStatement .getVariableName ());
269+ return ;
260270 }
261271
262272 final ConfiguredStatement <?> injected = formatInjector .inject (configured );
0 commit comments