Skip to content

Commit 009120e

Browse files
luoyuxiawuchong
authored andcommitted
[lake/paimon] Fix union read timestamp type issue (#1128)
1 parent ccfb15d commit 009120e

File tree

4 files changed

+107
-3
lines changed

4 files changed

+107
-3
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotAndLogSplitScanner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.reader.RecordReader;
3434
import org.apache.paimon.table.FileStoreTable;
3535
import org.apache.paimon.table.source.TableRead;
36+
import org.apache.paimon.types.RowType;
3637

3738
import javax.annotation.Nullable;
3839

@@ -64,6 +65,7 @@ public class PaimonSnapshotAndLogSplitScanner implements BatchScanner {
6465

6566
private final LogScanner logScanner;
6667
private final long stoppingOffset;
68+
private final RowType paimonRowType;
6769

6870
private boolean logScanFinished;
6971
private SortMergeReader currentSortMergeReader;
@@ -76,6 +78,7 @@ public PaimonSnapshotAndLogSplitScanner(
7678
PaimonSnapshotAndFlussLogSplit snapshotAndFlussLogSplit,
7779
@Nullable int[] projectedFields) {
7880
this.pkIndexes = flussTable.getTableInfo().getSchema().getPrimaryKeyIndexes();
81+
this.paimonRowType = fileStoreTable.rowType();
7982
int[] newProjectedFields = getNeedProjectFields(flussTable, projectedFields);
8083
this.tableRead =
8184
fileStoreTable.newReadBuilder().withProjection(newProjectedFields).newRead();
@@ -190,7 +193,7 @@ private SortMergeReader createSortMergeReader() throws IOException {
190193
private void pollLogRecords(Duration timeout) {
191194
ScanRecords scanRecords = logScanner.poll(timeout);
192195
for (ScanRecord scanRecord : scanRecords) {
193-
InternalRow paimonRow = new ScanRecordWrapper(scanRecord);
196+
InternalRow paimonRow = new ScanRecordWrapper(scanRecord, paimonRowType);
194197
boolean isDelete =
195198
scanRecord.getChangeType() == ChangeType.DELETE
196199
|| scanRecord.getChangeType() == ChangeType.UPDATE_BEFORE;

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,30 @@
1818

1919
import com.alibaba.fluss.client.table.scanner.ScanRecord;
2020
import com.alibaba.fluss.record.ChangeType;
21+
import com.alibaba.fluss.row.TimestampLtz;
22+
import com.alibaba.fluss.row.TimestampNtz;
2123

2224
import org.apache.paimon.data.BinaryString;
2325
import org.apache.paimon.data.Decimal;
2426
import org.apache.paimon.data.InternalArray;
2527
import org.apache.paimon.data.InternalMap;
2628
import org.apache.paimon.data.InternalRow;
2729
import org.apache.paimon.data.Timestamp;
30+
import org.apache.paimon.types.DataType;
2831
import org.apache.paimon.types.RowKind;
32+
import org.apache.paimon.types.RowType;
2933

3034
/** A wrapper of {@link ScanRecord} which bridges {@link ScanRecord} to Paimon' internal row. */
3135
public class ScanRecordWrapper implements InternalRow {
3236

3337
private final ChangeType changeType;
3438
private final com.alibaba.fluss.row.InternalRow flussRow;
39+
private final RowType rowType;
3540

36-
public ScanRecordWrapper(ScanRecord scanRecord) {
41+
public ScanRecordWrapper(ScanRecord scanRecord, RowType rowType) {
3742
this.changeType = scanRecord.getChangeType();
3843
this.flussRow = scanRecord.getRow();
44+
this.rowType = rowType;
3945
}
4046

4147
@Override
@@ -117,7 +123,32 @@ public Decimal getDecimal(int pos, int precision, int scale) {
117123

118124
@Override
119125
public Timestamp getTimestamp(int pos, int precision) {
120-
return Timestamp.fromInstant(flussRow.getTimestampLtz(pos, precision).toInstant());
126+
DataType paimonTimestampType = rowType.getTypeAt(pos);
127+
switch (paimonTimestampType.getTypeRoot()) {
128+
case TIMESTAMP_WITHOUT_TIME_ZONE:
129+
if (TimestampNtz.isCompact(precision)) {
130+
return Timestamp.fromEpochMillis(
131+
flussRow.getTimestampNtz(pos, precision).getMillisecond());
132+
} else {
133+
TimestampNtz timestampNtz = flussRow.getTimestampNtz(pos, precision);
134+
return Timestamp.fromEpochMillis(
135+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
136+
}
137+
138+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
139+
if (TimestampLtz.isCompact(precision)) {
140+
return Timestamp.fromEpochMillis(
141+
flussRow.getTimestampLtz(pos, precision).getEpochMillisecond());
142+
} else {
143+
TimestampLtz timestampLtz = flussRow.getTimestampLtz(pos, precision);
144+
return Timestamp.fromEpochMillis(
145+
timestampLtz.getEpochMillisecond(),
146+
timestampLtz.getNanoOfMillisecond());
147+
}
148+
default:
149+
throw new UnsupportedOperationException(
150+
"Unsupported data type to get timestamp: " + paimonTimestampType);
151+
}
121152
}
122153

123154
@Override

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.alibaba.fluss.metadata.TableDescriptor;
2424
import com.alibaba.fluss.metadata.TablePath;
2525
import com.alibaba.fluss.row.InternalRow;
26+
import com.alibaba.fluss.row.TimestampLtz;
27+
import com.alibaba.fluss.row.TimestampNtz;
2628
import com.alibaba.fluss.server.replica.Replica;
2729
import com.alibaba.fluss.types.DataTypes;
2830

@@ -31,6 +33,7 @@
3133
import org.apache.flink.types.Row;
3234
import org.apache.flink.util.CollectionUtil;
3335
import org.junit.jupiter.api.BeforeAll;
36+
import org.junit.jupiter.api.Test;
3437
import org.junit.jupiter.params.ParameterizedTest;
3538
import org.junit.jupiter.params.provider.ValueSource;
3639

@@ -39,6 +42,7 @@
3942
import java.time.Duration;
4043
import java.util.ArrayList;
4144
import java.util.Arrays;
45+
import java.util.Collections;
4246
import java.util.Comparator;
4347
import java.util.HashMap;
4448
import java.util.List;
@@ -202,6 +206,58 @@ void testPrimaryKeyTable(boolean isPartitioned) throws Exception {
202206
assertThat(rows.toString()).isEqualTo(sortedRows(expectedRows).toString());
203207
}
204208

209+
@Test
210+
void testUnionReadWithTimeStamp() throws Exception {
211+
// first of all, start tiering
212+
JobClient jobClient = buildTieringJob(execEnv);
213+
214+
String tableName = "pk_table_with_timestamp";
215+
TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
216+
long tableId =
217+
createPrimaryKeyTable(
218+
t1,
219+
1,
220+
Arrays.asList(
221+
new Schema.Column("c1", DataTypes.INT()),
222+
new Schema.Column("c2", DataTypes.TIMESTAMP_LTZ()),
223+
new Schema.Column("c3", DataTypes.TIMESTAMP())));
224+
// write some rows;
225+
List<InternalRow> rows =
226+
Arrays.asList(
227+
row(
228+
1,
229+
TimestampLtz.fromEpochMillis(1698235273182L, 5000),
230+
TimestampNtz.fromMillis(1698235273183L, 6000)),
231+
row(
232+
2,
233+
TimestampLtz.fromEpochMillis(1698235273200L, 5000),
234+
TimestampNtz.fromMillis(1698235273201L, 6000)));
235+
236+
writeRows(t1, rows, false);
237+
238+
// wait unit records has has been synced
239+
waitUtilBucketSynced(t1, tableId, 1, false);
240+
241+
// stop lake tiering service
242+
jobClient.cancel().get();
243+
244+
// write a row again
245+
rows =
246+
Collections.singletonList(
247+
row(
248+
2,
249+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
250+
TimestampNtz.fromMillis(1698235273501L, 8000)));
251+
writeRows(t1, rows, false);
252+
253+
// now, query the result, it must union lake snapshot and log
254+
List<String> result =
255+
toSortedRows(batchTEnv.executeSql("select * from " + tableName), false);
256+
assertThat(result.toString())
257+
.isEqualTo(
258+
"[+I[1, 2023-10-25T12:01:13.182005Z, 2023-10-25T12:01:13.183006], +I[2, 2023-10-25T12:01:13.400007Z, 2023-10-25T12:01:13.501008]]");
259+
}
260+
205261
private List<Row> paddingPartition(TablePath tablePath, List<Row> rows) {
206262
List<Row> paddingPartitionRows = new ArrayList<>();
207263
for (String partition : waitUntilPartitions(tablePath).values()) {

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,20 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
284284
return createTable(tablePath, tableBuilder.build());
285285
}
286286

287+
protected long createPrimaryKeyTable(
288+
TablePath tablePath, int bucketNum, List<Schema.Column> columns) throws Exception {
289+
Schema.Builder schemaBuilder =
290+
Schema.newBuilder().fromColumns(columns).primaryKey(columns.get(0).getName());
291+
292+
TableDescriptor.Builder tableBuilder =
293+
TableDescriptor.builder()
294+
.distributedBy(bucketNum)
295+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
296+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
297+
tableBuilder.schema(schemaBuilder.build());
298+
return createTable(tablePath, tableBuilder.build());
299+
}
300+
287301
protected long createPkTable(TablePath tablePath) throws Exception {
288302
return createPkTable(tablePath, 1);
289303
}

0 commit comments

Comments
 (0)