Skip to content

Commit 3815e9e

Browse files
authored
[paimon] Fix union read paimon issue (#2170)
1 parent 6321586 commit 3815e9e

File tree

10 files changed

+162
-63
lines changed

10 files changed

+162
-63
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,16 @@ public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> boundedSplits
5555
if (split instanceof LakeSnapshotSplit) {
5656
boundedSplits.add(split);
5757
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
58-
// lake split not finished, add to it
59-
if (!((LakeSnapshotAndFlussLogSplit) split).isLakeSplitFinished()) {
58+
LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
59+
(LakeSnapshotAndFlussLogSplit) split;
60+
boolean isStreaming = ((LakeSnapshotAndFlussLogSplit) split).isStreaming();
61+
// if is streaming and lake split not finished, add to it
62+
if (isStreaming) {
63+
if (!lakeSnapshotAndFlussLogSplit.isLakeSplitFinished()) {
64+
boundedSplits.add(split);
65+
}
66+
} else {
67+
// otherwise, in batch mode, always add it
6068
boundedSplits.add(split);
6169
}
6270
} else {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.Arrays;
4343
import java.util.Collections;
4444
import java.util.Comparator;
45-
import java.util.LinkedHashMap;
4645
import java.util.List;
4746
import java.util.Map;
4847
import java.util.TreeMap;
@@ -62,7 +61,6 @@ public class LakeSnapshotAndLogSplitScanner implements BatchScanner {
6261
// the indexes of primary key in emitted row by paimon and fluss
6362
private int[] keyIndexesInRow;
6463
@Nullable private int[] adjustProjectedFields;
65-
private final int[] newProjectedFields;
6664

6765
// the sorted logs in memory, mapping from key -> value
6866
private Map<InternalRow, KeyValueRow> logRows;
@@ -81,7 +79,7 @@ public LakeSnapshotAndLogSplitScanner(
8179
this.pkIndexes = table.getTableInfo().getSchema().getPrimaryKeyIndexes();
8280
this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit;
8381
this.lakeSource = lakeSource;
84-
this.newProjectedFields = getNeedProjectFields(table, projectedFields);
82+
int[] newProjectedFields = getNeedProjectFields(table, projectedFields);
8583

8684
this.logScanner = table.newScan().project(newProjectedFields).createLogScanner();
8785
this.lakeSource.withProject(
@@ -109,7 +107,9 @@ public LakeSnapshotAndLogSplitScanner(
109107
"StoppingOffset is null for split: "
110108
+ lakeSnapshotAndFlussLogSplit));
111109

112-
this.logScanFinished = lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset;
110+
this.logScanFinished =
111+
lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset
112+
|| stoppingOffset <= 0;
113113
}
114114

115115
private int[] getNeedProjectFields(Table flussTable, @Nullable int[] projectedFields) {
@@ -192,23 +192,26 @@ public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOExcep
192192
return currentSortMergeReader.readBatch();
193193
} else {
194194
if (lakeRecordIterators.isEmpty()) {
195+
List<RecordReader> recordReaders = new ArrayList<>();
195196
if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
196197
|| lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
197-
lakeRecordIterators = Collections.emptyList();
198-
logRows = new LinkedHashMap<>();
198+
// pass null split to get rowComparator
199+
recordReaders.add(lakeSource.createRecordReader(() -> null));
199200
} else {
200201
for (LakeSplit lakeSplit : lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
201-
RecordReader reader = lakeSource.createRecordReader(() -> lakeSplit);
202-
if (reader instanceof SortedRecordReader) {
203-
rowComparator = ((SortedRecordReader) reader).order();
204-
} else {
205-
throw new UnsupportedOperationException(
206-
"lake records must instance of sorted view.");
207-
}
208-
lakeRecordIterators.add(reader.read());
202+
recordReaders.add(lakeSource.createRecordReader(() -> lakeSplit));
203+
}
204+
}
205+
for (RecordReader reader : recordReaders) {
206+
if (reader instanceof SortedRecordReader) {
207+
rowComparator = ((SortedRecordReader) reader).order();
208+
} else {
209+
throw new UnsupportedOperationException(
210+
"lake records must instance of sorted view.");
209211
}
210-
logRows = new TreeMap<>(rowComparator);
212+
lakeRecordIterators.add(reader.read());
211213
}
214+
logRows = new TreeMap<>(rowComparator);
212215
}
213216
pollLogRecords(timeout);
214217
return CloseableIterator.wrap(Collections.emptyIterator());

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ private SortMergeRows sortMergeWithChangeLog(InternalRow lakeSnapshotRow) {
183183
// we should emit the log record firsts; and still need to iterator changelog to find
184184
// the first change log greater than the snapshot record
185185
List<InternalRow> emitRows = new ArrayList<>();
186-
emitRows.add(logKeyValueRow.valueRow());
186+
// only emit the log record if it's not a delete operation
187+
if (!logKeyValueRow.isDelete()) {
188+
emitRows.add(logKeyValueRow.valueRow());
189+
}
187190
boolean shouldEmitSnapshotRecord = true;
188191
while (changeLogIterator.hasNext()) {
189192
// get the next log record
@@ -203,7 +206,9 @@ private SortMergeRows sortMergeWithChangeLog(InternalRow lakeSnapshotRow) {
203206
} else if (compareResult > 0) {
204207
// snapshot record > the log record
205208
// the log record should be emitted
206-
emitRows.add(logKeyValueRow.valueRow());
209+
if (!logKeyValueRow.isDelete()) {
210+
emitRows.add(logKeyValueRow.valueRow());
211+
}
207212
} else {
208213
// log record == snapshot record
209214
// the log record should be emitted if is not delete, but the snapshot record
@@ -282,12 +287,14 @@ public void close() {
282287

283288
private static class ChangeLogIteratorWrapper implements CloseableIterator<InternalRow> {
284289
private CloseableIterator<KeyValueRow> changeLogRecordIterator;
290+
private KeyValueRow nextReturnRow;
285291

286292
public ChangeLogIteratorWrapper() {}
287293

288294
public ChangeLogIteratorWrapper replace(
289295
CloseableIterator<KeyValueRow> changeLogRecordIterator) {
290296
this.changeLogRecordIterator = changeLogRecordIterator;
297+
this.nextReturnRow = null;
291298
return this;
292299
}
293300

@@ -300,12 +307,27 @@ public void close() {
300307

301308
@Override
302309
public boolean hasNext() {
303-
return changeLogRecordIterator != null && changeLogRecordIterator.hasNext();
310+
if (nextReturnRow != null) {
311+
return true;
312+
}
313+
while (changeLogRecordIterator != null && changeLogRecordIterator.hasNext()) {
314+
KeyValueRow row = changeLogRecordIterator.next();
315+
if (!row.isDelete()) {
316+
nextReturnRow = row;
317+
return true;
318+
}
319+
}
320+
return false;
304321
}
305322

306323
@Override
307324
public InternalRow next() {
308-
return changeLogRecordIterator.next().valueRow();
325+
if (nextReturnRow == null) {
326+
throw new NoSuchElementException();
327+
}
328+
KeyValueRow row = nextReturnRow;
329+
nextReturnRow = null; // Clear cache after consuming
330+
return row.valueRow();
309331
}
310332
}
311333

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -628,8 +628,16 @@ private List<SourceSplitBase> generateHybridLakeFlussSplits() {
628628
stoppingOffsetsInitializer,
629629
tableInfo.getNumBuckets(),
630630
this::listPartitions);
631-
pendingHybridLakeFlussSplits = lakeSplitGenerator.generateHybridLakeFlussSplits();
632-
return pendingHybridLakeFlussSplits;
631+
List<SourceSplitBase> generatedSplits =
632+
lakeSplitGenerator.generateHybridLakeFlussSplits();
633+
if (generatedSplits == null) {
634+
// no hybrid lake splits, set the pending splits to empty list
635+
pendingHybridLakeFlussSplits = Collections.emptyList();
636+
return null;
637+
} else {
638+
pendingHybridLakeFlussSplits = generatedSplits;
639+
return generatedSplits;
640+
}
633641
} catch (Exception e) {
634642
throw new FlinkRuntimeException("Failed to generate hybrid lake fluss splits", e);
635643
}
@@ -680,6 +688,7 @@ private void handleSplitsAdd(List<SourceSplitBase> splits, Throwable t) {
680688
t);
681689
}
682690
}
691+
doHandleSplitsAdd(splits);
683692
if (isPartitioned) {
684693
if (!streaming || scanPartitionDiscoveryIntervalMs <= 0) {
685694
// if not streaming or partition discovery is disabled
@@ -691,7 +700,6 @@ private void handleSplitsAdd(List<SourceSplitBase> splits, Throwable t) {
691700
// so, noMoreNewPartitionSplits should be set to true
692701
noMoreNewSplits = true;
693702
}
694-
doHandleSplitsAdd(splits);
695703
}
696704

697705
private void doHandleSplitsAdd(List<SourceSplitBase> splits) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public String toString() {
8888
+ assignedBuckets
8989
+ ", assignedPartitions="
9090
+ assignedPartitions
91+
+ ", remainingHybridLakeFlussSplits="
92+
+ remainingHybridLakeFlussSplits
9193
+ '}';
9294
}
9395
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,11 @@ public class PaimonRecordReader implements RecordReader {
4848

4949
protected PaimonRowAsFlussRecordIterator iterator;
5050
protected @Nullable int[][] project;
51-
protected @Nullable Predicate predicate;
5251
protected RowType paimonRowType;
5352

5453
public PaimonRecordReader(
5554
FileStoreTable fileStoreTable,
56-
PaimonSplit split,
55+
@Nullable PaimonSplit split,
5756
@Nullable int[][] project,
5857
@Nullable Predicate predicate)
5958
throws IOException {
@@ -69,12 +68,17 @@ public PaimonRecordReader(
6968

7069
TableRead tableRead = readBuilder.newRead().executeFilter();
7170
paimonRowType = readBuilder.readType();
72-
73-
org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
74-
tableRead.createReader(split.dataSplit());
75-
iterator =
76-
new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
77-
recordReader.toCloseableIterator(), paimonRowType);
71+
if (split == null) {
72+
iterator =
73+
new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
74+
org.apache.paimon.utils.CloseableIterator.empty(), paimonRowType);
75+
} else {
76+
org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
77+
tableRead.createReader(split.dataSplit());
78+
iterator =
79+
new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
80+
recordReader.toCloseableIterator(), paimonRowType);
81+
}
7882
}
7983

8084
@Override

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ public class PaimonSortedRecordReader extends PaimonRecordReader implements Sort
3838

3939
public PaimonSortedRecordReader(
4040
FileStoreTable fileStoreTable,
41-
PaimonSplit split,
41+
// a temporary fix to pass null split to get the order comparator
42+
@Nullable PaimonSplit split,
4243
@Nullable int[][] project,
4344
@Nullable Predicate predicate)
4445
throws IOException {
4546
super(fileStoreTable, split, project, predicate);
4647
RowType pkKeyType =
4748
new RowType(
48-
PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR.keyFields(
49-
fileStoreTable.schema()));
50-
49+
PrimaryKeyTableUtils.addKeyNamePrefix(
50+
fileStoreTable.schema().primaryKeysFields()));
5151
this.comparator =
5252
toFlussRowComparator(paimonRowType, new KeyComparatorSupplier(pkKeyType).get());
5353
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.fluss.metadata.TableBucket;
2424
import org.apache.fluss.metadata.TableDescriptor;
2525
import org.apache.fluss.metadata.TablePath;
26+
import org.apache.fluss.row.BinaryString;
2627
import org.apache.fluss.row.Decimal;
28+
import org.apache.fluss.row.GenericRow;
2729
import org.apache.fluss.row.InternalRow;
2830
import org.apache.fluss.row.TimestampLtz;
2931
import org.apache.fluss.row.TimestampNtz;
@@ -39,6 +41,7 @@
3941
import org.apache.flink.util.CloseableIterator;
4042
import org.apache.flink.util.CollectionUtil;
4143
import org.junit.jupiter.api.BeforeAll;
44+
import org.junit.jupiter.api.Test;
4245
import org.junit.jupiter.api.io.TempDir;
4346
import org.junit.jupiter.params.ParameterizedTest;
4447
import org.junit.jupiter.params.provider.ValueSource;
@@ -89,7 +92,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
8992
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
9093

9194
// check the status of replica after synced
92-
assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset);
95+
assertReplicaStatus(bucketLogEndOffset);
9396

9497
// will read paimon snapshot, won't merge log since it's empty
9598
List<String> resultEmptyLog =
@@ -380,6 +383,51 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
380383
assertThat(projectRows2.toString()).isEqualTo(sortedRows(expectedProjectRows2).toString());
381384
}
382385

386+
@Test
387+
void testUnionReadWhenSomeBucketNotTiered() throws Exception {
388+
// first of all, start tiering
389+
JobClient jobClient = buildTieringJob(execEnv);
390+
391+
String tableName = "pk_table_union_read_some_bucket_not_tiered";
392+
TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
393+
int bucketNum = 3;
394+
// create table & write initial data
395+
long tableId = createSimplePkTable(t1, bucketNum, false, true);
396+
397+
writeRows(
398+
t1,
399+
Arrays.asList(
400+
GenericRow.of(
401+
1, BinaryString.fromString("v11"), BinaryString.fromString("v12")),
402+
GenericRow.of(
403+
2, BinaryString.fromString("v21"), BinaryString.fromString("v22"))),
404+
false);
405+
406+
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
407+
bucketLogEndOffset.put(new TableBucket(tableId, 1), 1L);
408+
bucketLogEndOffset.put(new TableBucket(tableId, 2), 1L);
409+
410+
// wait unit records have been synced
411+
waitUntilBucketsSynced(bucketLogEndOffset.keySet());
412+
413+
// check the status of replica after synced
414+
assertReplicaStatus(bucketLogEndOffset);
415+
416+
jobClient.cancel().get();
417+
writeRows(
418+
t1,
419+
Arrays.asList(
420+
GenericRow.of(
421+
0, BinaryString.fromString("v01"), BinaryString.fromString("v02")),
422+
GenericRow.of(
423+
3, BinaryString.fromString("v31"), BinaryString.fromString("v32"))),
424+
false);
425+
426+
List<String> result = toSortedRows(batchTEnv.executeSql("select * from " + tableName));
427+
assertThat(result.toString())
428+
.isEqualTo("[+I[0, v01, v02], +I[1, v11, v12], +I[2, v21, v22], +I[3, v31, v32]]");
429+
}
430+
383431
@ParameterizedTest
384432
@ValueSource(booleans = {false, true})
385433
void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
@@ -398,7 +446,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
398446
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
399447

400448
// check the status of replica after synced
401-
assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset);
449+
assertReplicaStatus(bucketLogEndOffset);
402450

403451
// will read paimon snapshot, should only +I since no change log
404452
List<Row> expectedRows = new ArrayList<>();
@@ -628,7 +676,7 @@ void testUnionReadPrimaryKeyTableFailover(boolean isPartitioned) throws Exceptio
628676
bucketLogEndOffset);
629677

630678
// check the status of replica after synced
631-
assertReplicaStatus(table1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset);
679+
assertReplicaStatus(bucketLogEndOffset);
632680

633681
// create result table
634682
createSimplePkTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, false);
@@ -895,7 +943,7 @@ protected long createPkTable(
895943
if (isPartitioned) {
896944
tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true);
897945
tableBuilder.partitionedBy(partitionKeys);
898-
schemaBuilder.primaryKey(primaryKey, partitionKeys);
946+
schemaBuilder.primaryKey(partitionKeys, primaryKey);
899947
tableBuilder.property(
900948
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR);
901949
} else {

0 commit comments

Comments
 (0)