|
16 | 16 | import io.tapdata.entity.schema.TapIndex; |
17 | 17 | import io.tapdata.entity.schema.TapIndexField; |
18 | 18 | import io.tapdata.entity.schema.TapTable; |
| 19 | +import io.tapdata.entity.schema.value.*; |
19 | 20 | import io.tapdata.entity.simplify.TapSimplify; |
20 | 21 | import io.tapdata.entity.simplify.pretty.BiClassHandlers; |
21 | 22 | import io.tapdata.entity.utils.DataMap; |
|
28 | 29 | import io.tapdata.pdk.apis.entity.WriteListResult; |
29 | 30 | import io.tapdata.pdk.apis.functions.ConnectorFunctions; |
30 | 31 |
|
31 | | -import java.sql.Connection; |
32 | | -import java.sql.SQLException; |
| 32 | +import java.sql.*; |
| 33 | +import java.time.Instant; |
| 34 | +import java.time.ZoneId; |
| 35 | +import java.time.ZoneOffset; |
33 | 36 | import java.util.List; |
| 37 | +import java.util.Map; |
| 38 | +import java.util.TimeZone; |
34 | 39 | import java.util.concurrent.atomic.AtomicInteger; |
35 | 40 | import java.util.function.Consumer; |
36 | 41 | import java.util.stream.Collectors; |
@@ -143,6 +148,21 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec |
143 | 148 | connectorFunctions.supportDropFieldFunction(this::fieldDDLHandler); |
144 | 149 | connectorFunctions.supportGetTableNamesFunction(this::getTableNames); |
145 | 150 | connectorFunctions.supportExecuteCommandFunction((a, b, c) -> SqlExecuteCommandFunction.executeCommand(a, b, () -> snowflakeJdbcContext.getConnection(), this::isAlive, c)); |
| 151 | + |
| 152 | + codecRegistry.registerFromTapValue(TapRawValue.class, "TEXT", tapRawValue -> { |
| 153 | + if (tapRawValue != null && tapRawValue.getValue() != null) return toJson(tapRawValue.getValue()); |
| 154 | + return "null"; |
| 155 | + }); |
| 156 | + codecRegistry.registerFromTapValue(TapTimeValue.class, tapTimeValue -> tapTimeValue.getValue().toTimeStr()); |
| 157 | + codecRegistry.registerFromTapValue(TapDateTimeValue.class, tapDateTimeValue -> { |
| 158 | + if (EmptyKit.isNotNull(tapDateTimeValue.getValue().getTimeZone())) { |
| 159 | + return tapDateTimeValue.getValue().toTimestamp(); |
| 160 | + } else { |
| 161 | + return formatTapDateTime(tapDateTimeValue.getValue(), "yyyy-MM-dd HH:mm:ss.SSSSSS"); |
| 162 | + } |
| 163 | + }); |
| 164 | + codecRegistry.registerFromTapValue(TapDateValue.class, tapDateValue -> tapDateValue.getValue().toSqlDate()); |
| 165 | + codecRegistry.registerFromTapValue(TapYearValue.class, "TEXT(4)", TapValue::getOriginValue); |
146 | 166 | } |
147 | 167 |
|
148 | 168 | @Override |
@@ -188,5 +208,23 @@ private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEve |
188 | 208 | .setTapLogger(tapLogger) |
189 | 209 | .write(tapRecordEvents, writeListResultConsumer, this::isAlive); |
190 | 210 | } |
| 211 | + |
| 212 | + @Override |
| 213 | + protected void processDataMap(DataMap dataMap, TapTable tapTable) { |
| 214 | + for (Map.Entry<String, Object> entry : dataMap.entrySet()) { |
| 215 | + Object value = entry.getValue(); |
| 216 | + if (value instanceof Timestamp) { |
| 217 | + if (!tapTable.getNameFieldMap().get(entry.getKey()).getDataType().startsWith("TIMESTAMP_TZ")) { |
| 218 | + entry.setValue(((Timestamp) value).toLocalDateTime().minusHours(snowflakeConfig.getZoneOffsetHour())); |
| 219 | + } else { |
| 220 | + entry.setValue(((Timestamp) value).toLocalDateTime().minusHours(TimeZone.getDefault().getRawOffset() / 3600000).atZone(ZoneOffset.UTC)); |
| 221 | + } |
| 222 | + } else if (value instanceof Date) { |
| 223 | + entry.setValue(Instant.ofEpochMilli(((Date) value).getTime()).atZone(ZoneId.systemDefault()).toLocalDateTime()); |
| 224 | + } else if (value instanceof Time) { |
| 225 | + entry.setValue(Instant.ofEpochMilli(((Time) value).getTime()).atZone(ZoneId.systemDefault()).toLocalDateTime().minusHours(snowflakeConfig.getZoneOffsetHour())); |
| 226 | + } |
| 227 | + } |
| 228 | + } |
191 | 229 | } |
192 | 230 |
|
0 commit comments