Skip to content

Commit d1c18ad

Browse files
committed
fix: Add executeUpdate option to QueryHandler for DDL and UPDATE statements
Signed-off-by: Edmund Miller <[email protected]>
1 parent 0ce1d63 commit d1c18ad

File tree

2 files changed

+28
-5
lines changed

2 files changed

+28
-5
lines changed

plugins/nf-sqldb/src/main/nextflow/sql/ChannelSqlExtension.groovy

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ class ChannelSqlExtension extends PluginExtensionPoint {
4747
db: CharSequence,
4848
emitColumns: Boolean,
4949
batchSize: Integer,
50-
batchDelay: Integer
50+
batchDelay: Integer,
51+
executeUpdate: Boolean
5152
]
5253

5354
private static final Map INSERT_PARAMS = [

plugins/nf-sqldb/src/main/nextflow/sql/QueryHandler.groovy

+26-4
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class QueryHandler implements QueryOp<QueryHandler> {
7171
private Integer batchSize
7272
private long batchDelayMillis = 100
7373
private int queryCount
74+
private boolean executeUpdate = false
7475

7576
@Override
7677
QueryOp withStatement(String stm) {
@@ -97,6 +98,8 @@ class QueryHandler implements QueryOp<QueryHandler> {
9798
this.batchSize = opts.batchSize as Integer
9899
if( opts.batchDelay )
99100
this.batchDelayMillis = opts.batchDelay as long
101+
if( opts.executeUpdate )
102+
this.executeUpdate = opts.executeUpdate as boolean
100103
return this
101104
}
102105

@@ -156,10 +159,29 @@ class QueryHandler implements QueryOp<QueryHandler> {
156159
protected void query0(Connection conn) {
157160
try {
158161
try (Statement stm = conn.createStatement()) {
159-
try( def rs = stm.executeQuery(normalize(statement)) ) {
160-
if( emitColumns )
161-
emitColumns(rs)
162-
emitRowsAndClose(rs)
162+
final String normalizedStmt = normalize(statement)
163+
// Check if statement is a DDL or UPDATE statement that doesn't return a ResultSet
164+
boolean isUpdateOrDdl = executeUpdate ||
165+
normalizedStmt.toUpperCase().startsWith("CREATE ") ||
166+
normalizedStmt.toUpperCase().startsWith("ALTER ") ||
167+
normalizedStmt.toUpperCase().startsWith("DROP ") ||
168+
normalizedStmt.toUpperCase().startsWith("INSERT ") ||
169+
normalizedStmt.toUpperCase().startsWith("UPDATE ") ||
170+
normalizedStmt.toUpperCase().startsWith("DELETE ");
171+
172+
if (isUpdateOrDdl) {
173+
// Use executeUpdate for statements that don't return ResultSets
174+
stm.executeUpdate(normalizedStmt)
175+
// Since there's no ResultSet to emit, just close the channel
176+
target.bind(Channel.STOP)
177+
}
178+
else {
179+
// For SELECT and other queries that return ResultSets
180+
try (def rs = stm.executeQuery(normalizedStmt)) {
181+
if (emitColumns)
182+
emitColumns(rs)
183+
emitRowsAndClose(rs)
184+
}
163185
}
164186
}
165187
}

0 commit comments

Comments
 (0)