|
16 | 16 | import io.airbyte.cdk.integrations.debezium.internals.DebeziumPropertiesManager; |
17 | 17 | import io.airbyte.cdk.integrations.debezium.internals.DebeziumRecordPublisher; |
18 | 18 | import io.airbyte.cdk.integrations.debezium.internals.DebeziumStateUtil; |
| 19 | +import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil; |
19 | 20 | import io.airbyte.cdk.integrations.debezium.internals.RelationalDbDebeziumPropertiesManager; |
20 | 21 | import io.airbyte.commons.json.Jsons; |
21 | 22 | import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; |
@@ -107,8 +108,15 @@ public static synchronized JsonNode constructInitialDebeziumState(final Properti |
107 | 108 | break; |
108 | 109 | } |
109 | 110 |
|
110 | | - if (Duration.between(engineStartTime, Instant.now()).compareTo(Duration.ofMinutes(5)) > 0) { |
111 | | - LOGGER.error("No record is returned even after {} seconds of waiting, closing the engine", 300); |
| 111 | + Duration initialWaitingDuration = Duration.ofMinutes(5L); |
| 112 | + // If initial waiting seconds is configured and it's greater than 5 minutes, use that value instead |
| 113 | + // of the default value |
| 114 | + final Duration configuredDuration = RecordWaitTimeUtil.getFirstRecordWaitTime(database.getSourceConfig()); |
| 115 | + if (configuredDuration.compareTo(initialWaitingDuration) > 0) { |
| 116 | + initialWaitingDuration = configuredDuration; |
| 117 | + } |
| 118 | + if (Duration.between(engineStartTime, Instant.now()).compareTo(initialWaitingDuration) > 0) { |
| 119 | + LOGGER.error("Schema history not constructed after {} seconds of waiting, closing the engine", initialWaitingDuration.getSeconds()); |
112 | 120 | publisher.close(); |
113 | 121 | throw new RuntimeException( |
114 | 122 | "Building schema history has timed out. Please consider increasing the debezium wait time in advanced options."); |
|
0 commit comments