4444import org .apache .wayang .jdbc .operators .JdbcFilterOperator ;
4545import org .apache .wayang .jdbc .operators .JdbcJoinOperator ;
4646import org .apache .wayang .jdbc .operators .JdbcProjectionOperator ;
47+ import org .apache .wayang .jdbc .operators .JdbcTableSinkOperator ;
4748import org .apache .wayang .jdbc .operators .JdbcTableSource ;
4849import org .apache .wayang .jdbc .platform .JdbcPlatformTemplate ;
4950import org .apache .logging .log4j .LogManager ;
5657import java .sql .ResultSet ;
5758import java .sql .ResultSetMetaData ;
5859import java .sql .SQLException ;
60+ import java .sql .Statement ;
5961import java .util .ArrayList ;
6062import java .util .Collection ;
6163import java .util .Set ;
@@ -82,14 +84,96 @@ public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) {
8284
8385 @ Override
8486 public void execute (final ExecutionStage stage , final OptimizationContext optimizationContext , final ExecutionState executionState ) {
85- final Tuple2 <String , SqlQueryChannel .Instance > pair = JdbcExecutor .createSqlQuery (stage , optimizationContext , this );
86- final String query = pair .field0 ;
87- final SqlQueryChannel .Instance queryChannel = pair .field1 ;
87+ // Check if this stage ends with a sink operator
88+ final Collection <?> termTasks = stage .getTerminalTasks ();
89+ assert termTasks .size () == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported." ;
90+ final ExecutionTask termTask = (ExecutionTask ) termTasks .toArray ()[0 ];
8891
89- queryChannel .setSqlQuery (query );
92+ if (termTask .getOperator () instanceof JdbcTableSinkOperator ) {
93+ // If it is a sink stage: compose and execute SQL directly within the database
94+ JdbcExecutor .executeSinkStage (stage , optimizationContext , this );
95+ } else {
96+ //If it is normal stage: compose SQL and store in channel for downstream consumption
97+ final Tuple2 <String , SqlQueryChannel .Instance > pair = JdbcExecutor .createSqlQuery (stage , optimizationContext , this );
98+ final String query = pair .field0 ;
99+ final SqlQueryChannel .Instance queryChannel = pair .field1 ;
100+ queryChannel .setSqlQuery (query );
101+ executionState .register (queryChannel );
102+ }
103+ }
104+
105+ /**
106+ * Handles execution stages that end with a {@link JdbcTableSinkOperator}.
107+ * Composes a SQL query from the stage's operators and executes it directly
108+ * on the database connection, keeping all data within the database.
109+ *
110+ * @param stage the execution stage ending with a sink
111+ * @param optimizationContext provides optimization information
112+ * @param jdbcExecutor the executor with the database connection
113+ */
114+ private static void executeSinkStage (final ExecutionStage stage ,
115+ final OptimizationContext optimizationContext ,
116+ final JdbcExecutor jdbcExecutor ) {
117+ final Collection <?> startTasks = stage .getStartTasks ();
118+ final Collection <?> termTasks = stage .getTerminalTasks ();
119+
120+ assert startTasks .size () == 1 : "Invalid JDBC stage: multiple sources are not currently supported" ;
121+ final ExecutionTask startTask = (ExecutionTask ) startTasks .toArray ()[0 ];
122+ assert termTasks .size () == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported." ;
123+ final ExecutionTask termTask = (ExecutionTask ) termTasks .toArray ()[0 ];
124+ assert startTask .getOperator () instanceof TableSource
125+ : "Invalid JDBC stage: Start task has to be a TableSource" ;
126+ assert termTask .getOperator () instanceof JdbcTableSinkOperator
127+ : "Invalid JDBC stage: Terminal task has to be a JdbcTableSinkOperator" ;
90128
91- // Return the tipChannelInstance.
92- executionState .register (queryChannel );
129+ // Extract operators from the stage
130+ final JdbcTableSource tableOp = (JdbcTableSource ) startTask .getOperator ();
131+ final JdbcTableSinkOperator sinkOp = (JdbcTableSinkOperator ) termTask .getOperator ();
132+ final Collection <JdbcFilterOperator > filterTasks = new ArrayList <>(4 );
133+ JdbcProjectionOperator projectionTask = null ;
134+ final Collection <JdbcJoinOperator <?>> joinTasks = new ArrayList <>();
135+
136+ // Walk through intermediate operators, stopping at the sink
137+ ExecutionTask nextTask = JdbcExecutor .findJdbcExecutionOperatorTaskInStage (startTask , stage );
138+ while (nextTask != null && !(nextTask .getOperator () instanceof JdbcTableSinkOperator )) {
139+ if (nextTask .getOperator () instanceof final JdbcFilterOperator filterOperator ) {
140+ filterTasks .add (filterOperator );
141+ } else if (nextTask .getOperator () instanceof JdbcProjectionOperator projectionOperator ) {
142+ assert projectionTask == null ;
143+ projectionTask = projectionOperator ;
144+ } else if (nextTask .getOperator () instanceof JdbcJoinOperator joinOperator ) {
145+ joinTasks .add (joinOperator );
146+ } else {
147+ throw new WayangException (String .format ("Unsupported JDBC execution task %s" , nextTask .toString ()));
148+ }
149+ nextTask = JdbcExecutor .findJdbcExecutionOperatorTaskInStage (nextTask , stage );
150+ }
151+
152+ // Compose the SELECT query
153+ final StringBuilder selectQuery = createSqlString (jdbcExecutor , tableOp , filterTasks , projectionTask , joinTasks );
154+
155+ // Remove trailing semicolon from SELECT
156+ String selectSql = selectQuery .toString ();
157+ if (selectSql .endsWith (";" )) {
158+ selectSql = selectSql .substring (0 , selectSql .length () - 1 );
159+ }
160+
161+ // Get the sink's SQL clause
162+ final String sinkClause = sinkOp .createSqlClause (jdbcExecutor .connection , jdbcExecutor .functionCompiler );
163+
164+ // Execute on the database
165+ try (Statement stmt = jdbcExecutor .connection .createStatement ()) {
166+ // Handle overwrite: drop existing table first
167+ if ("overwrite" .equals (sinkOp .getMode ())) {
168+ stmt .execute ("DROP TABLE IF EXISTS " + sinkOp .getTableName ());
169+ }
170+ // Execute the composed query: CREATE TABLE x AS SELECT ... or INSERT INTO x SELECT ...
171+ final String fullSql = sinkClause + " " + selectSql + sinkOp .createSqlSuffix ();
172+ stmt .execute (fullSql );
173+ jdbcExecutor .logger .info ("Executed SQL sink: {}" , fullSql );
174+ } catch (SQLException e ) {
175+ throw new WayangException ("Failed to execute SQL sink on table: " + sinkOp .getTableName (), e );
176+ }
93177 }
94178
95179 /**
0 commit comments