Skip to content

Commit 48968fb

Browse files
yzeng1618zengyi
andauthored
[Fix][Connector-V2][Databend] Ensure CDC final merge on committer close (#10349)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent 4be926a commit 48968fb

File tree

1 file changed

+45
-8
lines changed

1 file changed

+45
-8
lines changed

seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class DatabendSinkAggregatedCommitter
5959

6060
private Connection connection;
6161
private boolean isCdcMode;
62+
private volatile boolean aborted;
6263
// Store catalog table to access schema information
6364
private CatalogTable catalogTable;
6465

@@ -130,14 +131,21 @@ public List<DatabendSinkAggregatedCommitInfo> commit(
130131
List<DatabendSinkAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
131132
// Perform final merge operation in CDC mode only when necessary
132133
if (isCdcMode) {
133-
performMerge(aggregatedCommitInfos);
134+
if (log.isDebugEnabled()) {
135+
log.debug(
136+
"[Instance {}] Committing aggregatedCommitInfos size: {}",
137+
instanceId,
138+
aggregatedCommitInfos == null ? 0 : aggregatedCommitInfos.size());
139+
}
140+
performMerge();
134141
}
135142

136143
// Return empty list as there's no need to retry
137144
return new ArrayList<>();
138145
}
139146

140-
private void performMerge(List<DatabendSinkAggregatedCommitInfo> aggregatedCommitInfos) {
147+
/** Perform merge from CDC stream to target table. */
148+
private void performMerge() {
141149
// Merge all the data from raw table to target table
142150
String mergeSql = generateMergeSql();
143151
log.info("[Instance {}] Executing MERGE INTO statement: {}", instanceId, mergeSql);
@@ -214,6 +222,7 @@ public DatabendSinkAggregatedCommitInfo combine(List<DatabendSinkCommitterInfo>
214222
@Override
215223
public void abort(List<DatabendSinkAggregatedCommitInfo> aggregatedCommitInfos)
216224
throws IOException {
225+
aborted = true;
217226
// In case of abort, we might want to clean up the raw table and stream
218227
log.info("[Instance {}] Aborting Databend sink operations", instanceId);
219228
try {
@@ -235,16 +244,44 @@ public void abort(List<DatabendSinkAggregatedCommitInfo> aggregatedCommitInfos)
235244

236245
@Override
237246
public void close() throws IOException {
247+
Exception closeException = null;
238248
try {
239-
if (connection != null && !connection.isClosed()) {
240-
connection.close();
249+
if (!aborted && isCdcMode && connection != null && !connection.isClosed()) {
250+
try {
251+
log.info("[Instance {}] Performing final merge before closing", instanceId);
252+
performMerge();
253+
} catch (Exception mergeEx) {
254+
log.error(
255+
"[Instance {}] Final merge failed, will still close connection: {}",
256+
instanceId,
257+
mergeEx.getMessage(),
258+
mergeEx);
259+
}
241260
}
242-
} catch (SQLException e) {
261+
} catch (Exception e) {
262+
closeException = e;
263+
} finally {
264+
if (connection != null) {
265+
try {
266+
connection.close();
267+
} catch (SQLException e) {
268+
if (closeException != null) {
269+
closeException.addSuppressed(e);
270+
} else {
271+
closeException = e;
272+
}
273+
}
274+
}
275+
}
276+
277+
if (closeException != null) {
243278
throw new DatabendConnectorException(
244279
DatabendConnectorErrorCode.CONNECT_FAILED,
245-
"[Instance {}] Failed to close connection in DatabendSinkAggregatedCommitter: "
246-
+ e.getMessage(),
247-
e);
280+
"[Instance "
281+
+ instanceId
282+
+ "] Failed to close connection in DatabendSinkAggregatedCommitter: "
283+
+ closeException.getMessage(),
284+
closeException);
248285
}
249286
}
250287
}

0 commit comments

Comments
 (0)