Skip to content

Commit 409a955

Browse files
authored
[hotfix] Paimon should always write bucket0 for bucket-unwared log table (#1661)
1 parent d863bb4 commit 409a955

File tree

2 files changed

+21
-9
lines changed

2 files changed

+21
-9
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.record.LogRecord;
2323

2424
import org.apache.paimon.data.InternalRow;
25+
import org.apache.paimon.table.BucketMode;
2526
import org.apache.paimon.table.FileStoreTable;
2627
import org.apache.paimon.table.sink.TableWriteImpl;
2728

@@ -34,6 +35,8 @@
3435
/** A {@link RecordWriter} to write to Paimon's append-only table. */
3536
public class AppendOnlyWriter extends RecordWriter<InternalRow> {
3637

38+
private final FileStoreTable fileStoreTable;
39+
3740
public AppendOnlyWriter(
3841
FileStoreTable fileStoreTable,
3942
TableBucket tableBucket,
@@ -48,6 +51,7 @@ public AppendOnlyWriter(
4851
tableBucket,
4952
partition,
5053
partitionKeys); // Pass to parent
54+
this.fileStoreTable = fileStoreTable;
5155
}
5256

5357
@Override
@@ -56,6 +60,11 @@ public void write(LogRecord record) throws Exception {
5660
// hacky, call internal method tableWrite.getWrite() to support
5761
// to write to given partition, otherwise, it'll always extract a partition from Paimon row
5862
// which may be costly
59-
tableWrite.getWrite().write(partition, bucket, flussRecordAsPaimonRow);
63+
int writtenBucket = bucket;
64+
// if bucket-unaware mode, we have to use bucket = 0 to write to follow paimon best practice
65+
if (fileStoreTable.store().bucketMode() == BucketMode.BUCKET_UNAWARE) {
66+
writtenBucket = 0;
67+
}
68+
tableWrite.getWrite().write(partition, writtenBucket, flussRecordAsPaimonRow);
6069
}
6170
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.paimon.catalog.CatalogFactory;
4242
import org.apache.paimon.catalog.Identifier;
4343
import org.apache.paimon.data.InternalRow;
44-
import org.apache.paimon.io.DataFileMeta;
4544
import org.apache.paimon.options.Options;
4645
import org.apache.paimon.schema.Schema;
4746
import org.apache.paimon.table.FileStoreTable;
@@ -80,7 +79,6 @@
8079
import static org.apache.fluss.record.ChangeType.INSERT;
8180
import static org.apache.fluss.record.ChangeType.UPDATE_AFTER;
8281
import static org.apache.fluss.record.ChangeType.UPDATE_BEFORE;
83-
import static org.apache.fluss.utils.Preconditions.checkState;
8482
import static org.assertj.core.api.Assertions.assertThat;
8583

8684
/** The UT for tiering to Paimon via {@link PaimonLakeTieringFactory}. */
@@ -624,12 +622,17 @@ private CloseableIterator<InternalRow> getPaimonRows(
624622
// for log table, we can't filter by bucket directly, filter file by __bucket column
625623
for (Split split : readBuilder.newScan().plan().splits()) {
626624
DataSplit dataSplit = (DataSplit) split;
627-
List<DataFileMeta> dataFileMetas = dataSplit.dataFiles();
628-
checkState(dataFileMetas.size() == 1);
629-
DataFileMeta dataFileMeta = dataFileMetas.get(0);
630-
// filter by __bucket column
631-
if (dataFileMeta.valueStats().maxValues().getInt(3) == bucket
632-
&& dataFileMeta.valueStats().minValues().getInt(3) == bucket) {
625+
// bucket is always 0
626+
assertThat(dataSplit.bucket()).isEqualTo(0);
627+
// filter by __bucket column, remove any data file that don't belone to this bucket
628+
dataSplit
629+
.dataFiles()
630+
.removeIf(
631+
dataFileMeta ->
632+
!(dataFileMeta.valueStats().maxValues().getInt(3) == bucket
633+
&& dataFileMeta.valueStats().minValues().getInt(3)
634+
== bucket));
635+
if (!dataSplit.dataFiles().isEmpty()) {
633636
splits.add(split);
634637
}
635638
}

0 commit comments

Comments
 (0)