Skip to content

Commit 29dc982

Browse files
committed
fix tests
1 parent 7fe4cf8 commit 29dc982

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
3737
public class FlussRecordAsPaimonRow implements InternalRow {
3838

39+
// Lake table for paimon will append three system columns: __bucket, __offset,__timestamp
40+
private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
3941
private final RowType tableTowType;
4042
private final int bucket;
4143
private LogRecord logRecord;
@@ -52,7 +54,7 @@ public void setFlussRecord(LogRecord logRecord) {
5254
this.internalRow = logRecord.getRow();
5355
this.originRowFieldCount = internalRow.getFieldCount();
5456
checkState(
55-
originRowFieldCount == tableTowType.getFieldCount(),
57+
originRowFieldCount == tableTowType.getFieldCount() - LAKE_PAIMON_SYSTEM_COLUMNS,
5658
"The paimon table fields count must equals to LogRecord's fields count.");
5759
}
5860

@@ -61,7 +63,7 @@ public int getFieldCount() {
6163
return
6264
// business (including partitions) + system (three system fields: bucket, offset,
6365
// timestamp)
64-
originRowFieldCount + 3;
66+
originRowFieldCount + LAKE_PAIMON_SYSTEM_COLUMNS;
6567
}
6668

6769
@Override

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,15 @@ void testLogTableRecordAllTypes() {
5757
new org.apache.paimon.types.VarCharType(),
5858
new org.apache.paimon.types.DecimalType(5, 2),
5959
new org.apache.paimon.types.DecimalType(20, 0),
60-
new org.apache.paimon.types.LocalZonedTimestampType(3),
61-
new org.apache.paimon.types.TimestampType(3),
60+
new org.apache.paimon.types.LocalZonedTimestampType(6),
61+
new org.apache.paimon.types.TimestampType(6),
6262
new org.apache.paimon.types.BinaryType(),
63-
new org.apache.paimon.types.VarCharType());
63+
new org.apache.paimon.types.VarCharType(),
64+
// append three system columns: __bucket, __offset,__timestamp
65+
new org.apache.paimon.types.IntType(),
66+
new org.apache.paimon.types.BigIntType(),
67+
new org.apache.paimon.types.LocalZonedTimestampType(3));
68+
6469
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
6570
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
6671
long logOffset = 0;
@@ -76,8 +81,8 @@ void testLogTableRecordAllTypes() {
7681
genericRow.setField(7, BinaryString.fromString("string"));
7782
genericRow.setField(8, Decimal.fromUnscaledLong(9, 5, 2));
7883
genericRow.setField(9, Decimal.fromBigDecimal(new BigDecimal(10), 20, 0));
79-
genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L));
80-
genericRow.setField(11, TimestampNtz.fromMillis(1698235273182L, 45678));
84+
genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L, 5678));
85+
genericRow.setField(11, TimestampNtz.fromMillis(1698235273182L, 5678));
8186
genericRow.setField(12, new byte[] {1, 2, 3, 4});
8287
genericRow.setField(13, null);
8388
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
@@ -96,10 +101,14 @@ void testLogTableRecordAllTypes() {
96101
.isEqualTo(new BigDecimal("0.09"));
97102
assertThat(flussRecordAsPaimonRow.getDecimal(9, 20, 0).toBigDecimal())
98103
.isEqualTo(new BigDecimal(10));
99-
assertThat(flussRecordAsPaimonRow.getTimestamp(10, 3).getMillisecond())
104+
assertThat(flussRecordAsPaimonRow.getTimestamp(10, 6).getMillisecond())
100105
.isEqualTo(1698235273182L);
106+
assertThat(flussRecordAsPaimonRow.getTimestamp(10, 6).getNanoOfMillisecond())
107+
.isEqualTo(5678);
101108
assertThat(flussRecordAsPaimonRow.getTimestamp(11, 6).getMillisecond())
102109
.isEqualTo(1698235273182L);
110+
assertThat(flussRecordAsPaimonRow.getTimestamp(11, 6).getNanoOfMillisecond())
111+
.isEqualTo(5678);
103112
assertThat(flussRecordAsPaimonRow.getBinary(12)).isEqualTo(new byte[] {1, 2, 3, 4});
104113
assertThat(flussRecordAsPaimonRow.isNullAt(13)).isTrue();
105114

@@ -118,7 +127,13 @@ void testLogTableRecordAllTypes() {
118127
@Test
119128
void testPrimaryKeyTableRecord() {
120129
int tableBucket = 0;
121-
RowType tableRowType = RowType.of(new org.apache.paimon.types.BooleanType());
130+
RowType tableRowType =
131+
RowType.of(
132+
new org.apache.paimon.types.BooleanType(),
133+
// append three system columns: __bucket, __offset,__timestamp
134+
new org.apache.paimon.types.IntType(),
135+
new org.apache.paimon.types.BigIntType(),
136+
new org.apache.paimon.types.LocalZonedTimestampType(3));
122137
FlussRecordAsPaimonRow flussRecordAsPaimonRow =
123138
new FlussRecordAsPaimonRow(tableBucket, tableRowType);
124139
long logOffset = 0;

0 commit comments

Comments
 (0)