Skip to content

Commit 7fe4cf8

Browse files
committed
[lake/paimon] Fix than tiering service for paimon missed timestampNtz support
1 parent ec6991e commit 7fe4cf8

File tree

5 files changed

+77
-16
lines changed

5 files changed

+77
-16
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,42 @@
1818

1919
import com.alibaba.fluss.record.LogRecord;
2020
import com.alibaba.fluss.row.TimestampLtz;
21+
import com.alibaba.fluss.row.TimestampNtz;
2122

2223
import org.apache.paimon.data.BinaryString;
2324
import org.apache.paimon.data.Decimal;
2425
import org.apache.paimon.data.InternalArray;
2526
import org.apache.paimon.data.InternalMap;
2627
import org.apache.paimon.data.InternalRow;
2728
import org.apache.paimon.data.Timestamp;
29+
import org.apache.paimon.types.DataType;
2830
import org.apache.paimon.types.RowKind;
31+
import org.apache.paimon.types.RowType;
2932

3033
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
34+
import static com.alibaba.fluss.utils.Preconditions.checkState;
3135

3236
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
3337
public class FlussRecordAsPaimonRow implements InternalRow {
3438

39+
private final RowType tableTowType;
3540
private final int bucket;
3641
private LogRecord logRecord;
3742
private int originRowFieldCount;
3843
private com.alibaba.fluss.row.InternalRow internalRow;
3944

40-
public FlussRecordAsPaimonRow(int bucket) {
45+
public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
4146
this.bucket = bucket;
47+
this.tableTowType = tableTowType;
4248
}
4349

4450
public void setFlussRecord(LogRecord logRecord) {
4551
this.logRecord = logRecord;
4652
this.internalRow = logRecord.getRow();
4753
this.originRowFieldCount = internalRow.getFieldCount();
54+
checkState(
55+
originRowFieldCount == tableTowType.getFieldCount(),
56+
"The paimon table fields count must equals to LogRecord's fields count.");
4857
}
4958

5059
@Override
@@ -142,13 +151,33 @@ public Timestamp getTimestamp(int pos, int precision) {
142151
if (pos == originRowFieldCount + 2) {
143152
return Timestamp.fromEpochMillis(logRecord.timestamp());
144153
}
145-
if (TimestampLtz.isCompact(precision)) {
146-
return Timestamp.fromEpochMillis(
147-
internalRow.getTimestampLtz(pos, precision).getEpochMillisecond());
148-
} else {
149-
TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision);
150-
return Timestamp.fromEpochMillis(
151-
timestampLtz.getEpochMillisecond(), timestampLtz.getNanoOfMillisecond());
154+
155+
DataType paimonTimestampType = tableTowType.getTypeAt(pos);
156+
157+
switch (paimonTimestampType.getTypeRoot()) {
158+
case TIMESTAMP_WITHOUT_TIME_ZONE:
159+
if (TimestampNtz.isCompact(precision)) {
160+
return Timestamp.fromEpochMillis(
161+
internalRow.getTimestampNtz(pos, precision).getMillisecond());
162+
} else {
163+
TimestampNtz timestampNtz = internalRow.getTimestampNtz(pos, precision);
164+
return Timestamp.fromEpochMillis(
165+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
166+
}
167+
168+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
169+
if (TimestampLtz.isCompact(precision)) {
170+
return Timestamp.fromEpochMillis(
171+
internalRow.getTimestampLtz(pos, precision).getEpochMillisecond());
172+
} else {
173+
TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision);
174+
return Timestamp.fromEpochMillis(
175+
timestampLtz.getEpochMillisecond(),
176+
timestampLtz.getNanoOfMillisecond());
177+
}
178+
default:
179+
throw new UnsupportedOperationException(
180+
"Unsupported data type to get timestamp: " + paimonTimestampType);
152181
}
153182
}
154183

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/RecordWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.data.BinaryRow;
2323
import org.apache.paimon.table.sink.CommitMessage;
2424
import org.apache.paimon.table.sink.TableWriteImpl;
25+
import org.apache.paimon.types.RowType;
2526

2627
import javax.annotation.Nullable;
2728

@@ -34,19 +35,23 @@
3435
public abstract class RecordWriter<T> implements AutoCloseable {
3536

3637
protected final TableWriteImpl<T> tableWrite;
38+
protected final RowType tableRowType;
3739
protected final int bucket;
3840
@Nullable protected final BinaryRow partition;
3941
protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow;
4042

4143
public RecordWriter(
4244
TableWriteImpl<T> tableWrite,
45+
RowType tableRowType,
4346
TableBucket tableBucket,
4447
@Nullable String partition,
4548
List<String> partitionKeys) {
4649
this.tableWrite = tableWrite;
50+
this.tableRowType = tableRowType;
4751
this.bucket = tableBucket.getBucket();
4852
this.partition = toPaimonPartitionBinaryRow(partitionKeys, partition);
49-
this.flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(tableBucket.getBucket());
53+
this.flussRecordAsPaimonRow =
54+
new FlussRecordAsPaimonRow(tableBucket.getBucket(), tableRowType);
5055
}
5156

5257
public abstract void write(LogRecord record) throws Exception;

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public AppendOnlyWriter(
4343
(TableWriteImpl<InternalRow>)
4444
// todo: set ioManager to support write-buffer-spillable
4545
fileStoreTable.newWrite(FLUSS_LAKE_TIERING_COMMIT_USER),
46+
fileStoreTable.rowType(),
4647
tableBucket,
4748
partition,
4849
partitionKeys); // Pass to parent

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ public MergeTreeWriter(
4444
TableBucket tableBucket,
4545
@Nullable String partition,
4646
List<String> partitionKeys) {
47-
super(createTableWrite(fileStoreTable), tableBucket, partition, partitionKeys);
47+
super(
48+
createTableWrite(fileStoreTable),
49+
fileStoreTable.rowType(),
50+
tableBucket,
51+
partition,
52+
partitionKeys);
4853
this.rowKeyExtractor = fileStoreTable.createRowKeyExtractor();
4954
}
5055

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import com.alibaba.fluss.row.Decimal;
2323
import com.alibaba.fluss.row.GenericRow;
2424
import com.alibaba.fluss.row.TimestampLtz;
25+
import com.alibaba.fluss.row.TimestampNtz;
2526

2627
import org.apache.paimon.data.Timestamp;
2728
import org.apache.paimon.types.RowKind;
29+
import org.apache.paimon.types.RowType;
2830
import org.junit.jupiter.api.Test;
2931

3032
import java.math.BigDecimal;
@@ -42,8 +44,25 @@ class FlussRecordAsPaimonRowTest {
4244
@Test
4345
void testLogTableRecordAllTypes() {
4446
// Construct a FlussRecordAsPaimonRow instance
45-
int bucket = 0;
46-
FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(bucket);
47+
int tableBucket = 0;
48+
RowType tableRowType =
49+
RowType.of(
50+
new org.apache.paimon.types.BooleanType(),
51+
new org.apache.paimon.types.TinyIntType(),
52+
new org.apache.paimon.types.SmallIntType(),
53+
new org.apache.paimon.types.IntType(),
54+
new org.apache.paimon.types.BigIntType(),
55+
new org.apache.paimon.types.FloatType(),
56+
new org.apache.paimon.types.DoubleType(),
57+
new org.apache.paimon.types.VarCharType(),
58+
new org.apache.paimon.types.DecimalType(5, 2),
59+
new org.apache.paimon.types.DecimalType(20, 0),
60+
new org.apache.paimon.types.LocalZonedTimestampType(3),
61+
new org.apache.paimon.types.TimestampType(3),
62+
new org.apache.paimon.types.BinaryType(),
63+
new org.apache.paimon.types.VarCharType());
64+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
65+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
4766
long logOffset = 0;
4867
long timeStamp = System.currentTimeMillis();
4968
GenericRow genericRow = new GenericRow(14);
@@ -58,7 +77,7 @@ void testLogTableRecordAllTypes() {
5877
genericRow.setField(8, Decimal.fromUnscaledLong(9, 5, 2));
5978
genericRow.setField(9, Decimal.fromBigDecimal(new BigDecimal(10), 20, 0));
6079
genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L));
61-
genericRow.setField(11, TimestampLtz.fromEpochMillis(1698235273182L, 45678));
80+
genericRow.setField(11, TimestampNtz.fromMillis(1698235273182L, 45678));
6281
genericRow.setField(12, new byte[] {1, 2, 3, 4});
6382
genericRow.setField(13, null);
6483
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
@@ -85,7 +104,7 @@ void testLogTableRecordAllTypes() {
85104
assertThat(flussRecordAsPaimonRow.isNullAt(13)).isTrue();
86105

87106
// verify FlussRecordAsPaimonRow system columns (no partition fields, so indices stay same)
88-
assertThat(flussRecordAsPaimonRow.getInt(14)).isEqualTo(bucket);
107+
assertThat(flussRecordAsPaimonRow.getInt(14)).isEqualTo(tableBucket);
89108
assertThat(flussRecordAsPaimonRow.getLong(15)).isEqualTo(logOffset);
90109
assertThat(flussRecordAsPaimonRow.getLong(16)).isEqualTo(timeStamp);
91110
assertThat(flussRecordAsPaimonRow.getTimestamp(16, 4))
@@ -98,8 +117,10 @@ void testLogTableRecordAllTypes() {
98117

99118
@Test
100119
void testPrimaryKeyTableRecord() {
101-
int bucket = 0;
102-
FlussRecordAsPaimonRow flussRecordAsPaimonRow = new FlussRecordAsPaimonRow(bucket);
120+
int tableBucket = 0;
121+
RowType tableRowType = RowType.of(new org.apache.paimon.types.BooleanType());
122+
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
123+
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
103124
long logOffset = 0;
104125
long timeStamp = System.currentTimeMillis();
105126
GenericRow genericRow = new GenericRow(1);

0 commit comments

Comments
 (0)