Skip to content

Commit 9a51692

Browse files
committed
feat: TAP-6051 DB - Kafka Transformer
Closes TAP-6051
1 parent d50eff2 commit 9a51692

File tree

3 files changed

+29
-7
lines changed

3 files changed

+29
-7
lines changed

connectors/kafka-enhanced-connector/src/main/java/io/tapdata/kafka/schema_mode/OriginalSchemaMode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ public TapEvent toTapEvent(ConsumerRecord<?, ?> consumerRecord) {
6868
public List<ProducerRecord<Object, Object>> fromTapEvent(TapTable table, TapEvent tapEvent) {
6969
if (tapEvent instanceof TapInsertRecordEvent) {
7070
TapInsertRecordEvent insertRecordEvent = (TapInsertRecordEvent) tapEvent;
71-
String topic = KafkaUtils.pickTopic(kafkaService.getConfig(), "", "", table);
71+
String topic = topic(table, tapEvent);
7272
Map<String, Object> afterMap = insertRecordEvent.getAfter();
7373
Object key = afterMap.get(FIELD_KEY);
74-
Object value = afterMap.get(FIELD_VALUE);
74+
Object value = afterMap.containsKey(FIELD_KEY) ? afterMap.get(FIELD_VALUE) : afterMap;
7575
Integer partition = (Integer) afterMap.get(FIELD_PARTITION);
7676
Long ts = Optional.ofNullable(afterMap.get(FIELD_TIMESTAMP)).map(o -> {
7777
if (o instanceof String) {

connectors/kafka-enhanced-connector/src/main/java/io/tapdata/kafka/schema_mode/StandardSchemaMode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public TapEvent toTapEvent(ConsumerRecord<?, ?> consumerRecord) {
7777

7878
@Override
7979
public List<ProducerRecord<Object, Object>> fromTapEvent(TapTable table, TapEvent tapEvent) {
80-
String topic = KafkaUtils.pickTopic(kafkaService.getConfig(), "", "", table);
80+
String topic = topic(table, tapEvent);
8181
Long ts = tapEvent.getTime();
8282

8383
Map<String, Object> data;

connectors/kafka-enhanced-connector/src/main/java/io/tapdata/kafka/serialization/StandardSerializer.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
import io.tapdata.kafka.utils.StandardEventUtils;
99
import org.apache.kafka.common.serialization.Serializer;
1010

11-
import java.util.HashMap;
12-
import java.util.Map;
11+
import java.util.*;
1312

1413
/**
1514
* TapData 标准事件序列器
@@ -25,9 +24,9 @@ public byte[] serialize(String topic, TapEvent data) {
2524
StandardEventUtils.setTs(map, data.getTime());
2625
if (data instanceof TapBaseEvent) {
2726
TapBaseEvent recordEvent = (TapBaseEvent) data;
28-
StandardEventUtils.setOpTs(map, recordEvent.getReferenceTime());
27+
StandardEventUtils.setOpTs(map, null != recordEvent.getReferenceTime() ? recordEvent.getReferenceTime() : System.currentTimeMillis());
2928
StandardEventUtils.setTable(map, recordEvent.getTableId());
30-
StandardEventUtils.setNamespaces(map, recordEvent.getNamespaces());
29+
StandardEventUtils.setNamespaces(map, getNamespaces(recordEvent));
3130
} else {
3231
StandardEventUtils.setTable(map, topic);
3332
}
@@ -75,4 +74,27 @@ private void toUnknownMap(Map<String, Object> map, TapEvent recordEvent) {
7574
StandardEventUtils.setOp(map, EventOperation.DML_UNKNOWN);
7675
StandardEventUtils.setData(map, recordEvent);
7776
}
77+
78+
private List<String> getNamespaces(TapBaseEvent tapBaseEvent) {
79+
if (null == tapBaseEvent) {
80+
return null;
81+
}
82+
List<String> namespaces = tapBaseEvent.getNamespaces();
83+
if (null != namespaces) {
84+
return namespaces;
85+
}
86+
List<String> ns = new ArrayList<>();
87+
String database = tapBaseEvent.getDatabase();
88+
if (null != database) {
89+
ns.add(database);
90+
}
91+
String schema = tapBaseEvent.getSchema();
92+
if (null != schema) {
93+
ns.add(schema);
94+
}
95+
if (!ns.isEmpty()) {
96+
return ns;
97+
}
98+
return null;
99+
}
78100
}

0 commit comments

Comments
 (0)