Skip to content

Commit 9f1e590

Browse files
[Feature][Connector-V2] Suppor Time type in paimon connector (#8880)
1 parent f8c47fb commit 9f1e590

File tree

22 files changed

+145
-48
lines changed

22 files changed

+145
-48
lines changed

Diff for: docs/en/connector-v2/source/Paimon.md

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ The field data types currently supported by where conditions are as follows:
7373
* double
7474
* date
7575
* timestamp
76+
* time
7677

7778
### paimon.hadoop.conf [string]
7879

Diff for: docs/zh/connector-v2/source/Paimon.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
7474
* float
7575
* double
7676
* date
77-
* timestamp
77+
* timestamp
78+
* time
7879

7980
### paimon.hadoop.conf [string]
8081

Diff for: seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ public static LocalTime parse(String time, Formatter formatter) {
3838
return LocalTime.parse(time, FORMATTER_MAP.get(formatter));
3939
}
4040

41+
public static LocalTime parse(String dateTime) {
42+
return LocalTime.parse(dateTime, FORMATTER_MAP.get(matchTimeFormatter(dateTime)));
43+
}
44+
4145
public static final Pattern[] PATTERN_ARRAY =
4246
new Pattern[] {
4347
Pattern.compile("\\d{2}:\\d{2}:\\d{2}"),

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/UpdatedDataFields.java

+11
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public class UpdatedDataFields {
4343
private static final List<DataTypeRoot> TIMESTAMP_TYPES =
4444
Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
4545

46+
private static final List<DataTypeRoot> TIME_TYPES =
47+
Arrays.asList(DataTypeRoot.TIME_WITHOUT_TIME_ZONE);
48+
4649
public static ConvertAction canConvert(DataType oldType, DataType newType) {
4750
if (oldType.equalsIgnoreNullable(newType)) {
4851
return ConvertAction.CONVERT;
@@ -93,6 +96,14 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) {
9396
: ConvertAction.IGNORE;
9497
}
9598

99+
oldIdx = TIME_TYPES.indexOf(oldType.getTypeRoot());
100+
newIdx = TIME_TYPES.indexOf(newType.getTypeRoot());
101+
if (oldIdx >= 0 && newIdx >= 0) {
102+
return DataTypeChecks.getPrecision(oldType) <= DataTypeChecks.getPrecision(newType)
103+
? ConvertAction.CONVERT
104+
: ConvertAction.IGNORE;
105+
}
106+
96107
return ConvertAction.EXCEPTION;
97108
}
98109

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.source.converter;
1919

20+
import org.apache.seatunnel.common.utils.DateUtils;
21+
import org.apache.seatunnel.common.utils.TimeUtils;
22+
2023
import org.apache.commons.lang3.StringUtils;
2124
import org.apache.paimon.data.BinaryString;
2225
import org.apache.paimon.data.Decimal;
@@ -266,8 +269,9 @@ private static Object convertValueByPaimonDataType(
266269
case DOUBLE:
267270
return Double.parseDouble(strValue);
268271
case DATE:
269-
return DateTimeUtils.toInternal(
270-
org.apache.seatunnel.common.utils.DateUtils.parse(strValue));
272+
return DateTimeUtils.toInternal(DateUtils.parse(strValue));
273+
case TIME_WITHOUT_TIME_ZONE:
274+
return DateTimeUtils.toInternal(TimeUtils.parse(strValue));
271275
case TIMESTAMP_WITHOUT_TIME_ZONE:
272276
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
273277
return Timestamp.fromLocalDateTime(

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java

+10
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.math.BigDecimal;
5454
import java.time.LocalDate;
5555
import java.time.LocalDateTime;
56+
import java.time.LocalTime;
5657
import java.util.HashMap;
5758
import java.util.List;
5859
import java.util.Map;
@@ -310,6 +311,10 @@ public static SeaTunnelRow convert(
310311
convertArrayType(
311312
fieldName, paimonArray, seatunnelArray.getElementType());
312313
break;
314+
case TIME:
315+
int timeInt = rowData.getInt(i);
316+
objects[i] = DateTimeUtils.toLocalTime(timeInt);
317+
break;
313318
case MAP:
314319
MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
315320
InternalMap map = rowData.getMap(i);
@@ -434,6 +439,11 @@ public static InternalRow reconvert(
434439
binaryWriter.writeTimestamp(
435440
i, Timestamp.fromLocalDateTime(datetime), precision);
436441
break;
442+
case TIME:
443+
LocalTime time = (LocalTime) seaTunnelRow.getField(i);
444+
BinaryWriter.createValueSetter(DataTypes.TIME())
445+
.setValue(binaryWriter, i, DateTimeUtils.toInternal(time));
446+
break;
437447
case MAP:
438448
MapType<?, ?> mapType = (MapType<?, ?>) seaTunnelRowType.getFieldType(i);
439449
SeaTunnelDataType<?> keyType = mapType.getKeyType();

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,7 @@ public static Column convert(BasicTypeDefine<DataType> typeDefine) {
178178
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
179179
LocalZonedTimestampType localZonedTimestampType =
180180
(LocalZonedTimestampType) dataType;
181-
seaTunnelDataType =
182-
paimonToSeaTunnelTypeVisitor.visit((LocalZonedTimestampType) dataType);
181+
seaTunnelDataType = paimonToSeaTunnelTypeVisitor.visit(localZonedTimestampType);
183182
physicalColumnBuilder.scale(localZonedTimestampType.getPrecision());
184183
break;
185184
case ARRAY:
@@ -228,6 +227,10 @@ public static RowType reconvert(SeaTunnelRowType seaTunnelRowType, TableSchema t
228227
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
229228
dataType = new TimestampType(((TimestampType) dataField.type()).getPrecision());
230229
}
230+
if (typeRoot.equals(DataTypeRoot.TIME_WITHOUT_TIME_ZONE)) {
231+
DataField dataField = SchemaUtil.getDataField(fields, fieldName);
232+
dataType = new TimeType(((TimeType) dataField.type()).getPrecision());
233+
}
231234
DataField dataField = new DataField(i, fieldName, dataType);
232235
dataFields[i] = dataField;
233236
}
@@ -517,6 +520,11 @@ public SeaTunnelDataType<?> visit(TimestampType timestampType) {
517520
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
518521
}
519522

523+
@Override
524+
public SeaTunnelDataType<?> visit(TimeType timeType) {
525+
return LocalTimeType.LOCAL_TIME_TYPE;
526+
}
527+
520528
@Override
521529
public SeaTunnelDataType<?> visit(LocalZonedTimestampType localZonedTimestampType) {
522530
return LocalTimeType.LOCAL_DATE_TIME_TYPE;

Diff for: seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogTest.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,15 @@ public void before() {
181181
(Long) null,
182182
false,
183183
null,
184-
"c_timestamp"));
184+
"c_timestamp"))
185+
.column(
186+
PhysicalColumn.of(
187+
"c_time",
188+
LocalTimeType.LOCAL_TIME_TYPE,
189+
(Long) null,
190+
false,
191+
null,
192+
"c_time"));
185193
}
186194

187195
@Test

Diff for: seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/UpdatedDataFieldsTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.types.FloatType;
2424
import org.apache.paimon.types.IntType;
2525
import org.apache.paimon.types.SmallIntType;
26+
import org.apache.paimon.types.TimeType;
2627
import org.apache.paimon.types.TimestampType;
2728
import org.apache.paimon.types.VarCharType;
2829

@@ -98,4 +99,21 @@ public void testCanConvertTimestamp() {
9899

99100
Assertions.assertEquals(UpdatedDataFields.ConvertAction.EXCEPTION, convertAction);
100101
}
102+
103+
@Test
104+
public void testCanConvertTime() {
105+
TimeType oldType = new TimeType(true, 3);
106+
TimeType biggerLengthTimestamp = new TimeType(true, 5);
107+
TimeType smallerLengthTimestamp = new TimeType(true, 2);
108+
VarCharType varCharType = new VarCharType();
109+
110+
UpdatedDataFields.ConvertAction convertAction;
111+
convertAction = UpdatedDataFields.canConvert(oldType, biggerLengthTimestamp);
112+
Assertions.assertEquals(UpdatedDataFields.ConvertAction.CONVERT, convertAction);
113+
convertAction = UpdatedDataFields.canConvert(oldType, smallerLengthTimestamp);
114+
Assertions.assertEquals(UpdatedDataFields.ConvertAction.IGNORE, convertAction);
115+
convertAction = UpdatedDataFields.canConvert(oldType, varCharType);
116+
117+
Assertions.assertEquals(UpdatedDataFields.ConvertAction.EXCEPTION, convertAction);
118+
}
101119
}

Diff for: seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.paimon.types.IntType;
3333
import org.apache.paimon.types.RowType;
3434
import org.apache.paimon.types.SmallIntType;
35+
import org.apache.paimon.types.TimeType;
3536
import org.apache.paimon.types.TimestampType;
3637
import org.apache.paimon.types.TinyIntType;
3738
import org.apache.paimon.types.VarBinaryType;
@@ -46,6 +47,7 @@
4647
import java.math.BigDecimal;
4748
import java.time.LocalDate;
4849
import java.time.LocalDateTime;
50+
import java.time.LocalTime;
4951
import java.util.Arrays;
5052

5153
import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
@@ -77,7 +79,8 @@ public void setUp() {
7779
new DataField(9, "float_col", new FloatType()),
7880
new DataField(10, "double_col", new DoubleType()),
7981
new DataField(11, "date_col", new DateType()),
80-
new DataField(12, "timestamp_col", new TimestampType())));
82+
new DataField(12, "timestamp_col", new TimestampType()),
83+
new DataField(13, "time_col", new TimeType())));
8184

8285
fieldNames = rowType.getFieldNames().toArray(new String[0]);
8386
}
@@ -97,7 +100,8 @@ public void testConvertSqlWhereToPaimonPredicate() {
97100
+ "float_col = 5.5 AND "
98101
+ "double_col = 6.6 AND "
99102
+ "date_col = '2022-01-01' AND "
100-
+ "timestamp_col = '2022-01-01T12:00:00.123'";
103+
+ "timestamp_col = '2022-01-01T12:00:00.123' AND "
104+
+ "time_col = '12:00:00.123'";
101105

102106
PlainSelect plainSelect = convertToPlainSelect(query);
103107
Predicate predicate =
@@ -125,7 +129,9 @@ public void testConvertSqlWhereToPaimonPredicate() {
125129
builder.equal(
126130
12,
127131
Timestamp.fromLocalDateTime(
128-
LocalDateTime.parse("2022-01-01T12:00:00.123"))));
132+
LocalDateTime.parse("2022-01-01T12:00:00.123"))),
133+
builder.equal(
134+
13, DateTimeUtils.toInternal(LocalTime.parse("12:00:00.123"))));
129135

130136
assertEquals(expectedPredicate.toString(), predicate.toString());
131137
}

Diff for: seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.math.BigDecimal;
5858
import java.time.LocalDate;
5959
import java.time.LocalDateTime;
60+
import java.time.LocalTime;
6061
import java.util.Arrays;
6162
import java.util.Collections;
6263
import java.util.HashMap;
@@ -90,7 +91,8 @@ public class RowConverterTest {
9091
"c_date",
9192
"c_timestamp",
9293
"c_map",
93-
"c_array"
94+
"c_array",
95+
"c_time"
9496
};
9597

9698
public static final SeaTunnelDataType<?>[] seaTunnelDataTypes = {
@@ -107,7 +109,8 @@ public class RowConverterTest {
107109
LocalTimeType.LOCAL_DATE_TYPE,
108110
LocalTimeType.LOCAL_DATE_TIME_TYPE,
109111
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
110-
ArrayType.STRING_ARRAY_TYPE
112+
ArrayType.STRING_ARRAY_TYPE,
113+
LocalTimeType.LOCAL_TIME_TYPE
111114
};
112115

113116
public static final List<String> KEY_NAME_LIST = Arrays.asList("c_tinyint");
@@ -129,7 +132,8 @@ public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
129132
DataTypes.DATE(),
130133
DataTypes.TIMESTAMP(),
131134
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
132-
DataTypes.ARRAY(DataTypes.STRING())
135+
DataTypes.ARRAY(DataTypes.STRING()),
136+
DataTypes.TIME()
133137
},
134138
new String[] {
135139
"c_tinyint",
@@ -145,7 +149,8 @@ public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
145149
"c_date",
146150
"c_timestamp",
147151
"c_map",
148-
"c_array"
152+
"c_array",
153+
"c_time",
149154
});
150155

151156
return new TableSchema(
@@ -172,11 +177,12 @@ public void generateTestData() {
172177
byte[] bytes = new byte[] {1, 2, 3, 4};
173178
boolean booleanValue = false;
174179
LocalDate date = LocalDate.of(1996, 3, 16);
180+
LocalTime time = LocalTime.of(12, 0, 0);
175181
LocalDateTime timestamp = LocalDateTime.of(1996, 3, 16, 4, 16, 20);
176182
Map<String, String> map = new HashMap<>();
177183
map.put("name", "paimon");
178184
String[] strings = new String[] {"paimon", "seatunnel"};
179-
Object[] objects = new Object[14];
185+
Object[] objects = new Object[15];
180186
objects[0] = tinyint;
181187
objects[1] = smallint;
182188
objects[2] = intNum;
@@ -191,8 +197,9 @@ public void generateTestData() {
191197
objects[11] = timestamp;
192198
objects[12] = map;
193199
objects[13] = strings;
200+
objects[14] = time;
194201
seaTunnelRow = new SeaTunnelRow(objects);
195-
BinaryRow binaryRow = new BinaryRow(14);
202+
BinaryRow binaryRow = new BinaryRow(15);
196203
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
197204
binaryRowWriter.writeByte(0, tinyint);
198205
binaryRowWriter.writeShort(1, smallint);
@@ -234,6 +241,7 @@ public void generateTestData() {
234241
binaryArrayWriter2.complete();
235242
binaryRowWriter.writeArray(
236243
13, binaryArray2, new InternalArraySerializer(DataTypes.STRING()));
244+
binaryRowWriter.writeInt(14, DateTimeUtils.toInternal(time));
237245
internalRow = binaryRow;
238246
}
239247

Diff for: seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class RowTypeConverterTest {
7171
DataTypes.BOOLEAN(),
7272
DataTypes.DATE(),
7373
DataTypes.TIMESTAMP(),
74+
DataTypes.TIME(),
7475
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
7576
DataTypes.ARRAY(DataTypes.STRING())
7677
},
@@ -87,6 +88,7 @@ public class RowTypeConverterTest {
8788
"c_boolean",
8889
"c_date",
8990
"c_timestamp",
91+
"c_time",
9092
"c_map",
9193
"c_array"
9294
});
@@ -110,6 +112,7 @@ public void before() {
110112
"c_boolean",
111113
"c_date",
112114
"c_timestamp",
115+
"c_time",
113116
"c_map",
114117
"c_array"
115118
},
@@ -126,6 +129,7 @@ public void before() {
126129
BasicType.BOOLEAN_TYPE,
127130
LocalTimeType.LOCAL_DATE_TYPE,
128131
LocalTimeType.LOCAL_DATE_TIME_TYPE,
132+
LocalTimeType.LOCAL_TIME_TYPE,
129133
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
130134
ArrayType.STRING_ARRAY_TYPE
131135
});
@@ -149,9 +153,10 @@ public void before() {
149153
new DataField(9, "c_boolean", DataTypes.BOOLEAN()),
150154
new DataField(10, "c_date", DataTypes.DATE()),
151155
new DataField(11, "c_timestamp", DataTypes.TIMESTAMP(6)),
156+
new DataField(12, "c_time", DataTypes.TIME()),
152157
new DataField(
153-
12, "c_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
154-
new DataField(13, "c_array", DataTypes.ARRAY(DataTypes.STRING())));
158+
13, "c_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
159+
new DataField(14, "c_array", DataTypes.ARRAY(DataTypes.STRING())));
155160

156161
tableSchema =
157162
new TableSchema(

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java

+1
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@ public class PaimonRecordWithFullType {
4646
public BinaryString c_bytes;
4747
public int c_date;
4848
public Timestamp c_timestamp;
49+
public int c_time;
4950
}

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,8 @@ private List<PaimonRecordWithFullType> loadPaimonDataWithFullType(FileStoreTable
481481
row.getDecimal(10, 30, 8),
482482
row.getString(11),
483483
row.getInt(12),
484-
row.getTimestamp(13, 6));
484+
row.getTimestamp(13, 6),
485+
row.getInt(14));
485486
result.add(paimonRecordWithFullType);
486487
});
487488
} catch (IOException e) {

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ protected List<PaimonRecordWithFullType> loadPaimonDataWithFullType(
141141
row.getDecimal(10, 30, 8),
142142
row.getString(11),
143143
row.getInt(12),
144-
row.getTimestamp(13, 6));
144+
row.getTimestamp(13, 6),
145+
row.getInt(14));
145146
result.add(paimonRecordWithFullType);
146147
});
147148
} catch (IOException e) {

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ source {
4040
c_bytes = bytes
4141
c_date = date
4242
c_timestamp = timestamp
43+
c_time = time
4344
}
4445
primaryKey {
4546
name = "pk_id"

0 commit comments

Comments
 (0)