Skip to content

Commit 501d34a

Browse files
committed
fix: Fix paimon writing issue
1. Manually specify update conditions, which will cause update/delete not working; 2. Using the date type as the primary key, it will cause the update/delete not working (cherry picked from commit 1ad9b71)
1 parent a613735 commit 501d34a

File tree

2 files changed

+134
-28
lines changed

2 files changed

+134
-28
lines changed

connectors/paimon-connector/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
<artifactId>paimon-connector</artifactId>
1414

1515
<properties>
16-
<maven.compiler.source>11</maven.compiler.source>
17-
<maven.compiler.target>11</maven.compiler.target>
16+
<maven.compiler.source>8</maven.compiler.source>
17+
<maven.compiler.target>8</maven.compiler.target>
1818
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19-
<java.version>11</java.version>
19+
<java.version>8</java.version>
2020
<paimon.version>1.2.0</paimon.version>
2121
<hadoop.version>3.3.6</hadoop.version>
2222
</properties>
@@ -274,8 +274,8 @@
274274
<artifactId>maven-compiler-plugin</artifactId>
275275
<version>3.7.0</version>
276276
<configuration>
277-
<source>8</source>
278-
<target>8</target>
277+
<source>${java.version}</source>
278+
<target>${java.version}</target>
279279
</configuration>
280280
</plugin>
281281
<!-- <plugin>-->

connectors/paimon-connector/src/main/java/io/tapdata/connector/paimon/service/PaimonService.java

Lines changed: 129 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,33 @@ public class PaimonService implements Closeable {
7070
// Flag to track if async commit is enabled
7171
private volatile boolean asyncCommitEnabled = false;
7272

73+
// ===== Paimon Field Cache for Performance =====
74+
// LRU cache for Paimon field mappings: Key = "database.tableName", Value = Map<fieldName, DataType>
75+
// Limit to 5 tables to avoid excessive memory usage
76+
private final Map<String, Map<String, DataType>> paimonFieldCache = Collections.synchronizedMap(
77+
new LinkedHashMap<String, Map<String, DataType>>(5, 0.75f, true) {
78+
private static final long serialVersionUID = 1L;
79+
80+
@Override
81+
protected boolean removeEldestEntry(Map.Entry<String, Map<String, DataType>> eldest) {
82+
return size() > 5;
83+
}
84+
}
85+
);
86+
87+
// LRU cache for field index mappings: Key = "database.tableName", Value = Map<fieldName, index>
88+
// Limit to 5 tables to avoid excessive memory usage
89+
private final Map<String, Map<String, Integer>> fieldIndexCache = Collections.synchronizedMap(
90+
new LinkedHashMap<String, Map<String, Integer>>(5, 0.75f, true) {
91+
private static final long serialVersionUID = 1L;
92+
93+
@Override
94+
protected boolean removeEldestEntry(Map.Entry<String, Map<String, Integer>> eldest) {
95+
return size() > 5;
96+
}
97+
}
98+
);
99+
73100
public PaimonService(PaimonConfig config) {
74101
this.config = config;
75102
}
@@ -885,6 +912,10 @@ private void cleanupAllResources() {
885912
lastCommitTime.clear();
886913
commitLocks.clear();
887914

915+
// Clear Paimon field cache
916+
paimonFieldCache.clear();
917+
fieldIndexCache.clear();
918+
888919
// Close old catalog if exists
889920
if (catalog != null) {
890921
try {
@@ -1040,7 +1071,7 @@ private void handleStreamInsert(TapInsertRecordEvent event, StreamTableWrite wri
10401071
String database = config.getDatabase();
10411072
Identifier identifier = Identifier.create(database, table.getName());
10421073
GenericRow row = convertToGenericRow(after, table, identifier);
1043-
int bucket = selectBucketForDynamic(after, table);
1074+
int bucket = selectBucketForDynamic(row, table);
10441075
writer.write(row, bucket);
10451076
}
10461077

@@ -1057,7 +1088,7 @@ private void handleStreamUpdate(TapUpdateRecordEvent event, StreamTableWrite wri
10571088
String database = config.getDatabase();
10581089
Identifier identifier = Identifier.create(database, table.getName());
10591090
GenericRow row = convertToGenericRow(after, table, identifier);
1060-
int bucket = selectBucketForDynamic(after, table);
1091+
int bucket = selectBucketForDynamic(row, table);
10611092
writer.write(row, bucket);
10621093
}
10631094

@@ -1076,42 +1107,109 @@ private void handleStreamDelete(TapDeleteRecordEvent event, StreamTableWrite wri
10761107
GenericRow row = convertToGenericRow(before, table, identifier);
10771108
// Set row kind to DELETE
10781109
row.setRowKind(org.apache.paimon.types.RowKind.DELETE);
1079-
int bucket = selectBucketForDynamic(before, table);
1110+
int bucket = selectBucketForDynamic(row, table);
10801111
writer.write(row, bucket);
10811112
}
10821113

10831114
/**
10841115
* Select deterministic bucket for dynamic-bucket tables.
10851116
* Use primary keys if present; otherwise hash all fields (sorted by name).
1117+
*
1118+
* Note: This method uses the converted GenericRow values to ensure consistent
1119+
* bucket selection across insert/update/delete operations, especially for
1120+
* Date/DateTime types that are converted to int/long values.
1121+
*
1122+
* @param row converted GenericRow with Paimon-compatible values
1123+
* @param table table definition
1124+
* @return bucket number
10861125
*/
1087-
private int selectBucketForDynamic(Map<String, Object> data, TapTable table) {
1126+
private int selectBucketForDynamic(GenericRow row, TapTable table) {
10881127
int hint = (config.getBucketCount() != null && config.getBucketCount() > 0) ? config.getBucketCount() : 4;
10891128
int hash = 0;
1090-
Collection<String> pks = table.primaryKeys();
1129+
Collection<String> pks = table.primaryKeys(true);
1130+
Map<String, TapField> fields = table.getNameFieldMap();
1131+
1132+
// Get or build field index mapping from cache
1133+
String cacheKey = table.getId();
1134+
Map<String, Integer> indexMap = getFieldIndexMap(cacheKey, fields);
1135+
10911136
if (pks != null && !pks.isEmpty()) {
1137+
// Use primary key fields for hashing
10921138
for (String key : pks) {
1093-
Object v = data.get(key);
1094-
hash = 31 * hash + (v == null ? 0 : v.hashCode());
1139+
Integer fieldIndex = indexMap.get(key);
1140+
if (fieldIndex != null && fieldIndex >= 0 && fieldIndex < row.getFieldCount()) {
1141+
Object v = row.getField(fieldIndex);
1142+
hash = 31 * hash + (v == null ? 0 : v.hashCode());
1143+
}
10951144
}
10961145
} else {
1097-
Map<String, TapField> fields = table.getNameFieldMap();
1146+
// Use all fields for hashing (sorted by name)
10981147
if (fields != null && !fields.isEmpty()) {
10991148
List<String> names = new ArrayList<>(fields.keySet());
11001149
Collections.sort(names);
11011150
for (String name : names) {
1102-
Object v = data.get(name);
1103-
hash = 31 * hash + (v == null ? 0 : v.hashCode());
1151+
Integer fieldIndex = indexMap.get(name);
1152+
if (fieldIndex != null && fieldIndex >= 0 && fieldIndex < row.getFieldCount()) {
1153+
Object v = row.getField(fieldIndex);
1154+
hash = 31 * hash + (v == null ? 0 : v.hashCode());
1155+
}
11041156
}
11051157
} else {
1106-
for (Map.Entry<String, Object> e : data.entrySet()) {
1107-
Object v = e.getValue();
1158+
// Fallback: hash all fields in order
1159+
for (int i = 0; i < row.getFieldCount(); i++) {
1160+
Object v = row.getField(i);
11081161
hash = 31 * hash + (v == null ? 0 : v.hashCode());
11091162
}
11101163
}
11111164
}
11121165
return Math.floorMod(hash, hint);
11131166
}
11141167

1168+
/**
1169+
* Get or build field index mapping from cache
1170+
*
1171+
* @param cacheKey cache key (table ID)
1172+
* @param fields field map
1173+
* @return map of field name to index
1174+
*/
1175+
private Map<String, Integer> getFieldIndexMap(String cacheKey, Map<String, TapField> fields) {
1176+
Map<String, Integer> indexMap = fieldIndexCache.get(cacheKey);
1177+
1178+
if (indexMap == null) {
1179+
// Cache miss - build field index mapping
1180+
indexMap = new HashMap<>(fields.size());
1181+
int index = 0;
1182+
for (String name : fields.keySet()) {
1183+
indexMap.put(name, index++);
1184+
}
1185+
1186+
// Store in cache
1187+
fieldIndexCache.put(cacheKey, indexMap);
1188+
}
1189+
1190+
return indexMap;
1191+
}
1192+
1193+
/**
1194+
* Get field index by field name (deprecated - use getFieldIndexMap instead)
1195+
*
1196+
* @param fieldName field name
1197+
* @param fields field map
1198+
* @return field index, or -1 if not found
1199+
* @deprecated Use getFieldIndexMap for better performance with caching
1200+
*/
1201+
@Deprecated
1202+
private int getFieldIndex(String fieldName, Map<String, TapField> fields) {
1203+
int index = 0;
1204+
for (String name : fields.keySet()) {
1205+
if (name.equals(fieldName)) {
1206+
return index;
1207+
}
1208+
index++;
1209+
}
1210+
return -1;
1211+
}
1212+
11151213
/**
11161214
* Convert map to GenericRow
11171215
*
@@ -1122,9 +1220,23 @@ private int selectBucketForDynamic(Map<String, Object> data, TapTable table) {
11221220
* @throws Exception if conversion fails
11231221
*/
11241222
private GenericRow convertToGenericRow(Map<String, Object> data, TapTable table, Identifier identifier) throws Exception {
1125-
// Get Paimon table to access actual field types
1126-
Table paimonTable = catalog.getTable(identifier);
1127-
List<DataField> paimonFields = paimonTable.rowType().getFields();
1223+
// Get or build field type mapping from cache
1224+
String cacheKey = identifier.getFullName();
1225+
Map<String, DataType> fieldTypeMap = paimonFieldCache.get(cacheKey);
1226+
1227+
if (fieldTypeMap == null) {
1228+
// Cache miss - build field type mapping
1229+
Table paimonTable = catalog.getTable(identifier);
1230+
List<DataField> paimonFields = paimonTable.rowType().getFields();
1231+
1232+
fieldTypeMap = new HashMap<>(paimonFields.size());
1233+
for (DataField paimonField : paimonFields) {
1234+
fieldTypeMap.put(paimonField.name(), paimonField.type());
1235+
}
1236+
1237+
// Store in cache
1238+
paimonFieldCache.put(cacheKey, fieldTypeMap);
1239+
}
11281240

11291241
Map<String, TapField> tapFields = table.getNameFieldMap();
11301242
int fieldCount = tapFields.size();
@@ -1135,14 +1247,8 @@ private GenericRow convertToGenericRow(Map<String, Object> data, TapTable table,
11351247
String fieldName = entry.getKey();
11361248
Object value = data.get(fieldName);
11371249

1138-
// Get corresponding Paimon field type
1139-
DataType paimonType = null;
1140-
for (DataField paimonField : paimonFields) {
1141-
if (paimonField.name().equals(fieldName)) {
1142-
paimonType = paimonField.type();
1143-
break;
1144-
}
1145-
}
1250+
// Get corresponding Paimon field type from cache
1251+
DataType paimonType = fieldTypeMap.get(fieldName);
11461252

11471253
// Convert value to Paimon-compatible type
11481254
values[index++] = convertValueToPaimonType(value, paimonType);

0 commit comments

Comments
 (0)