diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java index 5bbb4d822cb..b724376551f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java @@ -221,6 +221,10 @@ public void execute(Context context) throws Exception { } catch (Exception e) { LOG.error("Poll change stream records failed ", e); throw e; + } catch (Throwable t) { + // Handle error + LOG.error("Fatal error when polling change stream records: ", t); + throw new RuntimeException("Fatal error when polling change stream records", t); } finally { taskRunning = false; if (changeStreamCursor != null) {