Skip to content

Commit fccedca

Browse files
committed
fix: pgto array adapt
1 parent ff01049 commit fccedca

File tree

1 file changed

+39
-23
lines changed

1 file changed

+39
-23
lines changed

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public abstract class AbstractWalLogMiner {
3939
protected int recordSize;
4040
protected List<String> tableList;
4141
protected boolean filterSchema;
42+
private Map<String, String> pureDataTypeMap;
4243
private Map<String, String> dataTypeMap;
4344
protected final AtomicReference<Throwable> threadException = new AtomicReference<>();
4445
protected final PostgresCDCSQLParser sqlParser = new PostgresCDCSQLParser();
@@ -58,11 +59,13 @@ public AbstractWalLogMiner watch(List<String> tableList, KVReadOnlyMap<TapTable>
5859
withSchema = false;
5960
this.tableList = tableList;
6061
filterSchema = tableList.size() > 50;
62+
this.pureDataTypeMap = new ConcurrentHashMap<>();
6163
this.dataTypeMap = new ConcurrentHashMap<>();
6264
tableList.forEach(tableName -> {
6365
TapTable table = tableMap.get(tableName);
6466
if (EmptyKit.isNotNull(table)) {
65-
dataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> Optional.ofNullable(e.getValue().getPureDataType()).orElse(e.getValue().getDataType()))));
67+
pureDataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> Optional.ofNullable(e.getValue().getPureDataType()).orElse(e.getValue().getDataType()))));
68+
dataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> e.getValue().getDataType())));
6669
}
6770
});
6871
tableList.addAll(getSubPartitionTables(tableMap, tableList));
@@ -73,12 +76,14 @@ public AbstractWalLogMiner watch(Map<String, List<String>> schemaTableMap, KVRea
7376
withSchema = true;
7477
this.schemaTableMap = schemaTableMap;
7578
filterSchema = schemaTableMap.entrySet().stream().reduce(0, (a, b) -> a + b.getValue().size(), Integer::sum) > 50;
79+
this.pureDataTypeMap = new ConcurrentHashMap<>();
7680
this.dataTypeMap = new ConcurrentHashMap<>();
7781
schemaTableMap.forEach((schema, tables) -> {
7882
tables.forEach(tableName -> {
7983
TapTable table = tableMap.get(schema + "." + tableName);
8084
if (EmptyKit.isNotNull(table)) {
81-
dataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> schema + "." + tableName + "." + v.getKey(), e -> e.getValue().getPureDataType())));
85+
pureDataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> Optional.ofNullable(e.getValue().getPureDataType()).orElse(e.getValue().getDataType()))));
86+
dataTypeMap.putAll(table.getNameFieldMap().entrySet().stream().collect(Collectors.toMap(v -> tableName + "." + v.getKey(), e -> e.getValue().getDataType())));
8287
}
8388
});
8489
tables.addAll(getSubPartitionTables(tableMap, schema, tables));
@@ -167,10 +172,27 @@ protected void parseKeyAndValue(String tableName, Map.Entry<String, Object> stri
167172
return;
168173
}
169174
String key = tableName + "." + stringObjectEntry.getKey();
175+
String pureDataType = pureDataTypeMap.get(key);
170176
String dataType = dataTypeMap.get(key);
171-
if (EmptyKit.isNull(dataType)) {
177+
if (EmptyKit.isNull(pureDataType)) {
172178
return;
173179
}
180+
switch (pureDataType) {
181+
case "ARRAY":
182+
String arrayString = String.valueOf(value);
183+
List<Object> array = new ArrayList<>();
184+
Arrays.stream(arrayString.substring(1, arrayString.length() - 1).split(",")).forEach(v -> {
185+
array.add(parseType(v, StringKit.removeParentheses(dataType.replace("array", "").trim())));
186+
});
187+
stringObjectEntry.setValue(array);
188+
break;
189+
default:
190+
stringObjectEntry.setValue(parseType(value, pureDataType));
191+
break;
192+
}
193+
}
194+
195+
private Object parseType(Object value, String dataType) {
174196
switch (dataType) {
175197
case "smallint":
176198
case "integer":
@@ -179,19 +201,15 @@ protected void parseKeyAndValue(String tableName, Map.Entry<String, Object> stri
179201
case "money":
180202
case "real":
181203
case "double precision":
182-
stringObjectEntry.setValue(new BigDecimal((String) value));
183-
break;
204+
return new BigDecimal((String) value);
184205
case "bit":
185206
if (value instanceof String && ((String) value).length() == 1) {
186-
stringObjectEntry.setValue("1".equals(value));
207+
return "1".equals(value);
187208
}
188-
break;
189209
case "bytea":
190-
stringObjectEntry.setValue(StringKit.toByteArray(((String) value).substring(2)));
191-
break;
210+
return StringKit.toByteArray(String.valueOf(value).substring(2));
192211
case "date":
193-
stringObjectEntry.setValue(LocalDate.parse((String) value).atStartOfDay());
194-
break;
212+
return LocalDate.parse((String) value).atStartOfDay();
195213
case "interval":
196214
String[] intervalArray = ((String) value).split(" ");
197215
StringBuilder stringBuilder = new StringBuilder("P");
@@ -222,27 +240,23 @@ protected void parseKeyAndValue(String tableName, Map.Entry<String, Object> stri
222240
break;
223241
}
224242
}
225-
stringObjectEntry.setValue(stringBuilder.toString());
226-
break;
243+
return stringBuilder.toString();
227244
case "timestamp without time zone":
228245
case "timestamp":
229-
stringObjectEntry.setValue(Timestamp.valueOf((String) value).toLocalDateTime().minusHours(postgresConfig.getZoneOffsetHour()));
230-
break;
246+
return Timestamp.valueOf((String) value).toLocalDateTime().minusHours(postgresConfig.getZoneOffsetHour());
231247
case "timestamp with time zone":
232248
String timestamp = ((String) value).substring(0, ((String) value).length() - 3);
233249
String timezone = ((String) value).substring(((String) value).length() - 3);
234-
stringObjectEntry.setValue(Timestamp.valueOf(timestamp).toLocalDateTime().atZone(TimeZone.getTimeZone("GMT" + timezone + ":00").toZoneId()));
235-
break;
250+
return Timestamp.valueOf(timestamp).toLocalDateTime().atZone(TimeZone.getTimeZone("GMT" + timezone + ":00").toZoneId());
236251
case "time without time zone":
237252
case "time":
238-
stringObjectEntry.setValue(LocalTime.parse((String) value).atDate(LocalDate.ofYearDay(1970, 1)).minusHours(postgresConfig.getZoneOffsetHour()));
239-
break;
253+
return LocalTime.parse((String) value).atDate(LocalDate.ofYearDay(1970, 1)).minusHours(postgresConfig.getZoneOffsetHour());
240254
case "time with time zone":
241255
String time = ((String) value).substring(0, ((String) value).length() - 3);
242256
String zone = ((String) value).substring(((String) value).length() - 3);
243-
stringObjectEntry.setValue(LocalTime.parse(time).atDate(LocalDate.ofYearDay(1970, 1)).atZone(TimeZone.getTimeZone("GMT" + zone + ":00").toZoneId()));
244-
break;
257+
return LocalTime.parse(time).atDate(LocalDate.ofYearDay(1970, 1)).atZone(TimeZone.getTimeZone("GMT" + zone + ":00").toZoneId());
245258
}
259+
return value;
246260
}
247261

248262
protected static final String WALMINER_STOP = "select walminer_stop()";
@@ -268,7 +282,8 @@ private List<String> getSubPartitionTables(KVReadOnlyMap<TapTable> tableMap, Lis
268282
.filter(n -> !tables.contains(n))
269283
.collect(Collectors.toList());
270284
subTableNames.forEach(t -> tableMap.get(table).getNameFieldMap().forEach((k, field) -> {
271-
dataTypeMap.put(t + "." + k, field.getPureDataType());
285+
pureDataTypeMap.put(t + "." + k, field.getPureDataType());
286+
dataTypeMap.put(t + "." + k, field.getDataType());
272287
}));
273288
subPartitionTableNames.addAll(subTableNames);
274289
}
@@ -298,7 +313,8 @@ private List<String> getSubPartitionTables(KVReadOnlyMap<TapTable> tableMap, Str
298313
.filter(n -> !tables.contains(n))
299314
.collect(Collectors.toList());
300315
subTableNames.forEach(t -> tableMap.get(schema + "." + table).getNameFieldMap().forEach((k, field) -> {
301-
dataTypeMap.put(schema + "." + t + "." + k, field.getPureDataType());
316+
pureDataTypeMap.put(schema + "." + t + "." + k, field.getPureDataType());
317+
dataTypeMap.put(t + "." + k, field.getDataType());
302318
}));
303319
subPartitionTableNames.addAll(subTableNames);
304320
}

0 commit comments

Comments
 (0)