Skip to content

Commit 57bc5e9

Browse files
authored
[lake/paimon] Fix that tiering service for paimon missed timestamp type support
This closes #1121.
1 parent ec6991e commit 57bc5e9

File tree

5 files changed

+97
-19
lines changed

5 files changed

+97
-19
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,52 @@
1818

1919
import com.alibaba.fluss.record.LogRecord;
2020
import com.alibaba.fluss.row.TimestampLtz;
21+
import com.alibaba.fluss.row.TimestampNtz;
2122

2223
import org.apache.paimon.data.BinaryString;
2324
import org.apache.paimon.data.Decimal;
2425
import org.apache.paimon.data.InternalArray;
2526
import org.apache.paimon.data.InternalMap;
2627
import org.apache.paimon.data.InternalRow;
2728
import org.apache.paimon.data.Timestamp;
29+
import org.apache.paimon.types.DataType;
2830
import org.apache.paimon.types.RowKind;
31+
import org.apache.paimon.types.RowType;
2932

3033
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
34+
import static com.alibaba.fluss.utils.Preconditions.checkState;
3135

3236
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
3337
public class FlussRecordAsPaimonRow implements InternalRow {
3438

39+
// Lake table for paimon will append three system columns: __bucket, __offset,__timestamp
40+
private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
41+
private final RowType tableTowType;
3542
private final int bucket;
3643
private LogRecord logRecord;
3744
private int originRowFieldCount;
3845
private com.alibaba.fluss.row.InternalRow internalRow;
3946

40-
public FlussRecordAsPaimonRow(int bucket) {
47+
public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
4148
this.bucket = bucket;
49+
this.tableTowType = tableTowType;
4250
}
4351

4452
public void setFlussRecord(LogRecord logRecord) {
4553
this.logRecord = logRecord;
4654
this.internalRow = logRecord.getRow();
4755
this.originRowFieldCount = internalRow.getFieldCount();
56+
checkState(
57+
originRowFieldCount == tableTowType.getFieldCount() - LAKE_PAIMON_SYSTEM_COLUMNS,
58+
"The paimon table fields count must equals to LogRecord's fields count.");
4859
}
4960

5061
@Override
5162
public int getFieldCount() {
5263
return
5364
// business (including partitions) + system (three system fields: bucket, offset,
5465
// timestamp)
55-
originRowFieldCount + 3;
66+
originRowFieldCount + LAKE_PAIMON_SYSTEM_COLUMNS;
5667
}
5768

5869
@Override
@@ -142,13 +153,33 @@ public Timestamp getTimestamp(int pos, int precision) {
142153
if (pos == originRowFieldCount + 2) {
143154
return Timestamp.fromEpochMillis(logRecord.timestamp());
144155
}
145-
if (TimestampLtz.isCompact(precision)) {
146-
return Timestamp.fromEpochMillis(
147-
internalRow.getTimestampLtz(pos, precision).getEpochMillisecond());
148-
} else {
149-
TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision);
150-
return Timestamp.fromEpochMillis(
151-
timestampLtz.getEpochMillisecond(), timestampLtz.getNanoOfMillisecond());
156+
157+
DataType paimonTimestampType = tableTowType.getTypeAt(pos);
158+
159+
switch (paimonTimestampType.getTypeRoot()) {
160+
case TIMESTAMP_WITHOUT_TIME_ZONE:
161+
if (TimestampNtz.isCompact(precision)) {
162+
return Timestamp.fromEpochMillis(
163+
internalRow.getTimestampNtz(pos, precision).getMillisecond());
164+
} else {
165+
TimestampNtz timestampNtz = internalRow.getTimestampNtz(pos, precision);
166+
return Timestamp.fromEpochMillis(
167+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
168+
}
169+
170+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
171+
if (TimestampLtz.isCompact(precision)) {
172+
return Timestamp.fromEpochMillis(
173+
internalRow.getTimestampLtz(pos, precision).getEpochMillisecond());
174+
} else {
175+
TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision);
176+
return Timestamp.fromEpochMillis(
177+
timestampLtz.getEpochMillisecond(),
178+
timestampLtz.getNanoOfMillisecond());
179+
}
180+
default:
181+
throw new UnsupportedOperationException(
182+
"Unsupported data type to get timestamp: " + paimonTimestampType);
152183
}
153184
}
154185

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/RecordWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.data.BinaryRow;
2323
import org.apache.paimon.table.sink.CommitMessage;
2424
import org.apache.paimon.table.sink.TableWriteImpl;
25+
import org.apache.paimon.types.RowType;
2526

2627
import javax.annotation.Nullable;
2728

@@ -34,19 +35,23 @@
3435
public abstract class RecordWriter<T> implements AutoCloseable {
3536

3637
protected final TableWriteImpl<T> tableWrite;
38+
protected final RowType tableRowType;
3739
protected final int bucket;
3840
@Nullable protected final BinaryRow partition;
3941
protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow;
4042

4143
public RecordWriter(
4244
TableWriteImpl<T> tableWrite,
45+
RowType tableRowType,
4346
TableBucket tableBucket,
4447
@Nullable String partition,
4548
List<String> partitionKeys) {
4649
this.tableWrite = tableWrite;
50+
this.tableRowType = tableRowType;
4751
this.bucket = tableBucket.getBucket();
4852
this.partition = toPaimonPartitionBinaryRow(partitionKeys, partition);
49-
this.flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket.getBucket());
53+
this.flussRecordAsPaimonRow =
54+
new FlussRecordAsPaimonRow(tableBucket.getBucket(), tableRowType);
5055
}
5156

5257
public abstract void write(LogRecord record) throws Exception;

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public AppendOnlyWriter(
4343
(TableWriteImpl<InternalRow>)
4444
// todo: set ioManager to support write-buffer-spillable
4545
fileStoreTable.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER),
46+
fileStoreTable.rowType(),
4647
tableBucket,
4748
partition,
4849
partitionKeys); // Pass to parent

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ public MergeTreeWriter(
4444
TableBucket tableBucket,
4545
@Nullable String partition,
4646
List<String> partitionKeys) {
47-
super(createTableWrite(fileStoreTable), tableBucket, partition, partitionKeys);
47+
super(
48+
createTableWrite(fileStoreTable),
49+
fileStoreTable.rowType(),
50+
tableBucket,
51+
partition,
52+
partitionKeys);
4853
this.rowKeyExtractor = fileStoreTable.createRowKeyExtractor();
4954
}
5055

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import com.alibaba.fluss.row.Decimal;
2323
import com.alibaba.fluss.row.GenericRow;
2424
import com.alibaba.fluss.row.TimestampLtz;
25+
import com.alibaba.fluss.row.TimestampNtz;
2526

2627
import org.apache.paimon.data.Timestamp;
2728
import org.apache.paimon.types.RowKind;
29+
import org.apache.paimon.types.RowType;
2830
import org.junit.jupiter.api.Test;
2931

3032
import java.math.BigDecimal;
@@ -42,8 +44,30 @@ class FlussRecordAsPaimonRowTest {
4244
@Test
4345
void testLogTableRecordAllTypes() {
4446
// Construct a FlussRecordAsPaimonRow instance
45-
int bucket = 0;
46-
FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(bucket);
47+
int tableBucket = 0;
48+
RowType tableRowType =
49+
RowType.of(
50+
new org.apache.paimon.types.BooleanType(),
51+
new org.apache.paimon.types.TinyIntType(),
52+
new org.apache.paimon.types.SmallIntType(),
53+
new org.apache.paimon.types.IntType(),
54+
new org.apache.paimon.types.BigIntType(),
55+
new org.apache.paimon.types.FloatType(),
56+
new org.apache.paimon.types.DoubleType(),
57+
new org.apache.paimon.types.VarCharType(),
58+
new org.apache.paimon.types.DecimalType(5, 2),
59+
new org.apache.paimon.types.DecimalType(20, 0),
60+
new org.apache.paimon.types.LocalZonedTimestampType(6),
61+
new org.apache.paimon.types.TimestampType(6),
62+
new org.apache.paimon.types.BinaryType(),
63+
new org.apache.paimon.types.VarCharType(),
64+
// append three system columns: __bucket, __offset,__timestamp
65+
new org.apache.paimon.types.IntType(),
66+
new org.apache.paimon.types.BigIntType(),
67+
new org.apache.paimon.types.LocalZonedTimestampType(3));
68+
69+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
70+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
4771
long logOffset = 0;
4872
long timeStamp = System.currentTimeMillis();
4973
GenericRow genericRow = new GenericRow(14);
@@ -57,8 +81,8 @@ void testLogTableRecordAllTypes() {
5781
genericRow.setField(7, BinaryString.fromString("string"));
5882
genericRow.setField(8, Decimal.fromUnscaledLong(9, 5, 2));
5983
genericRow.setField(9, Decimal.fromBigDecimal(new BigDecimal(10), 20, 0));
60-
genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L));
61-
genericRow.setField(11, TimestampLtz.fromEpochMillis(1698235273182L, 45678));
84+
genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L, 5678));
85+
genericRow.setField(11, TimestampNtz.fromMillis(1698235273182L, 5678));
6286
genericRow.setField(12, new byte[] {1, 2, 3, 4});
6387
genericRow.setField(13, null);
6488
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
@@ -77,15 +101,19 @@ void testLogTableRecordAllTypes() {
77101
.isEqualTo(new BigDecimal("0.09"));
78102
assertThat(flussRecordAsPaimonRow.getDecimal(9, 20, 0).toBigDecimal())
79103
.isEqualTo(new BigDecimal(10));
80-
assertThat(flussRecordAsPaimonRow.getTimestamp(10, 3).getMillisecond())
104+
assertThat(flussRecordAsPaimonRow.getTimestamp(10, 6).getMillisecond())
81105
.isEqualTo(1698235273182L);
106+
assertThat(flussRecordAsPaimonRow.getTimestamp(10, 6).getNanoOfMillisecond())
107+
.isEqualTo(5678);
82108
assertThat(flussRecordAsPaimonRow.getTimestamp(11, 6).getMillisecond())
83109
.isEqualTo(1698235273182L);
110+
assertThat(flussRecordAsPaimonRow.getTimestamp(11, 6).getNanoOfMillisecond())
111+
.isEqualTo(5678);
84112
assertThat(flussRecordAsPaimonRow.getBinary(12)).isEqualTo(new byte[] {1, 2, 3, 4});
85113
assertThat(flussRecordAsPaimonRow.isNullAt(13)).isTrue();
86114

87115
// verify FlussRecordAsPaimonRow system columns (no partition fields, so indices stay same)
88-
assertThat(flussRecordAsPaimonRow.getInt(14)).isEqualTo(bucket);
116+
assertThat(flussRecordAsPaimonRow.getInt(14)).isEqualTo(tableBucket);
89117
assertThat(flussRecordAsPaimonRow.getLong(15)).isEqualTo(logOffset);
90118
assertThat(flussRecordAsPaimonRow.getLong(16)).isEqualTo(timeStamp);
91119
assertThat(flussRecordAsPaimonRow.getTimestamp(16, 4))
@@ -98,8 +126,16 @@ void testLogTableRecordAllTypes() {
98126

99127
@Test
100128
void testPrimaryKeyTableRecord() {
101-
int bucket = 0;
102-
FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(bucket);
129+
int tableBucket = 0;
130+
RowType tableRowType =
131+
RowType.of(
132+
new org.apache.paimon.types.BooleanType(),
133+
// append three system columns: __bucket, __offset,__timestamp
134+
new org.apache.paimon.types.IntType(),
135+
new org.apache.paimon.types.BigIntType(),
136+
new org.apache.paimon.types.LocalZonedTimestampType(3));
137+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
138+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
103139
long logOffset = 0;
104140
long timeStamp = System.currentTimeMillis();
105141
GenericRow genericRow = new GenericRow(1);

0 commit comments

Comments
 (0)