Skip to content

Commit 524433c

Browse files
committed
fix: kafka empty topic bug
1 parent 8a81f2d commit 524433c

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

connectors/kafka-connector/src/main/java/io/tapdata/connector/kafka/KafkaConnector.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,12 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
154154
}
155155

156156
private CreateTableOptions createTableV2(TapConnectorContext tapConnectorContext, TapCreateTableEvent tapCreateTableEvent) throws Throwable {
157-
String tableId = tapCreateTableEvent.getTableId();
157+
String tableId;
158+
if (EmptyKit.isBlank(kafkaConfig.getTopicName())) {
159+
tableId = tapCreateTableEvent.getTableId();
160+
} else {
161+
tableId = kafkaConfig.getTopicName();
162+
}
158163
CreateTableOptions createTableOptions = new CreateTableOptions();
159164
// if (!this.isSchemaRegister) {
160165
Integer replicasSize = Optional.ofNullable(kafkaConfig.getReplicasSize()).orElse(1);
@@ -317,10 +322,10 @@ private void streamRead(TapConnectorContext nodeContext, List<String> tableList,
317322
}
318323

319324
private Object timestampToStreamOffset(TapConnectorContext connectorContext, Long offsetStartTime) {
320-
if (null == offsetStartTime) {
321-
return System.currentTimeMillis();
322-
}
323-
return offsetStartTime;
325+
if (null == offsetStartTime) {
326+
return System.currentTimeMillis();
327+
}
328+
return offsetStartTime;
324329
}
325330

326331
private void checkConnection(TapConnectionContext connectionContext, List<String> items, Consumer<ConnectionCheckItem> consumer) {

0 commit comments

Comments
 (0)