Skip to content

Commit 47e3248

Browse files
authored
[lake] The bucket of PaimonSplit should be -1 for bucket unwared paimon table (#1732)
1 parent 058cf13 commit 47e3248

File tree

5 files changed

+63
-5
lines changed

5 files changed

+63
-5
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,19 @@ public class PaimonSplit implements LakeSplit {
3232

3333
private final DataSplit dataSplit;
3434

35-
public PaimonSplit(DataSplit dataSplit) {
35+
private final boolean isBucketUnAware;
36+
37+
public PaimonSplit(DataSplit dataSplit, boolean isBucketUnAware) {
3638
this.dataSplit = dataSplit;
39+
this.isBucketUnAware = isBucketUnAware;
3740
}
3841

3942
@Override
4043
public int bucket() {
44+
if (isBucketUnAware) {
45+
// bucket-unaware table returns -1
46+
return -1;
47+
}
4148
return dataSplit.bucket();
4249
}
4350

@@ -61,4 +68,8 @@ public List<String> partition() {
6168
public DataSplit dataSplit() {
6269
return dataSplit;
6370
}
71+
72+
public boolean isBucketUnAware() {
73+
return isBucketUnAware;
74+
}
6475
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlanner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.catalog.CatalogFactory;
2929
import org.apache.paimon.options.Options;
3030
import org.apache.paimon.predicate.Predicate;
31+
import org.apache.paimon.table.BucketMode;
3132
import org.apache.paimon.table.FileStoreTable;
3233
import org.apache.paimon.table.source.DataSplit;
3334
import org.apache.paimon.table.source.InnerTableScan;
@@ -67,12 +68,14 @@ public List<PaimonSplit> plan() {
6768
try (Catalog catalog = getCatalog()) {
6869
FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId);
6970
InnerTableScan tableScan = fileStoreTable.newScan();
71+
boolean isBucketUnAware = fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE;
72+
7073
if (predicate != null) {
7174
tableScan = tableScan.withFilter(predicate);
7275
}
7376
for (Split split : tableScan.plan().splits()) {
7477
DataSplit dataSplit = (DataSplit) split;
75-
splits.add(new PaimonSplit(dataSplit));
78+
splits.add(new PaimonSplit(dataSplit, isBucketUnAware));
7679
}
7780
}
7881
return splits;

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializer.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@
2626

2727
import java.io.ByteArrayInputStream;
2828
import java.io.ByteArrayOutputStream;
29+
import java.io.DataInputStream;
2930
import java.io.IOException;
3031

3132
/** Serializer for paimon split. */
3233
public class PaimonSplitSerializer implements SimpleVersionedSerializer<PaimonSplit> {
3334

35+
private static final int VERSION_1 = 1;
36+
3437
@Override
3538
public int getVersion() {
36-
return 1;
39+
return VERSION_1;
3740
}
3841

3942
@Override
@@ -42,6 +45,7 @@ public byte[] serialize(PaimonSplit paimonSplit) throws IOException {
4245
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
4346
DataSplit dataSplit = paimonSplit.dataSplit();
4447
InstantiationUtil.serializeObject(view, dataSplit);
48+
view.writeBoolean(paimonSplit.isBucketUnAware());
4549
return out.toByteArray();
4650
}
4751

@@ -51,9 +55,16 @@ public PaimonSplit deserialize(int version, byte[] serialized) throws IOExceptio
5155
DataSplit dataSplit;
5256
try {
5357
dataSplit = InstantiationUtil.deserializeObject(in, getClass().getClassLoader());
58+
59+
if (version == VERSION_1) {
60+
DataInputStream dis = new DataInputStream(in);
61+
boolean isBucketUnAware = dis.readBoolean();
62+
return new PaimonSplit(dataSplit, isBucketUnAware);
63+
} else {
64+
throw new IOException("Unsupported PaimonSplit serialization version: " + version);
65+
}
5466
} catch (ClassNotFoundException e) {
55-
throw new IOException(e);
67+
throw new IOException("Failed to deserialize PaimonSplit", e);
5668
}
57-
return new PaimonSplit(dataSplit);
5869
}
5970
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitPlannerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.Snapshot;
2828
import org.apache.paimon.data.BinaryString;
2929
import org.apache.paimon.data.GenericRow;
30+
import org.apache.paimon.data.InternalRow;
3031
import org.apache.paimon.schema.Schema;
3132
import org.apache.paimon.table.FileStoreTable;
3233
import org.apache.paimon.table.Table;
@@ -78,4 +79,35 @@ void testPlan() throws Exception {
7879
.map(PaimonSplit::dataSplit)
7980
.collect(Collectors.toList()));
8081
}
82+
83+
@Test
84+
void testPlannerCreatesCorrectSplitsForLogTableWithoutBucketKey() throws Exception {
85+
// test log table without bucket key
86+
String tableName = "planner_log_table_without_bucket_key";
87+
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
88+
89+
Schema.Builder builder =
90+
Schema.newBuilder()
91+
.column("id", DataTypes.INT())
92+
.column("message", DataTypes.STRING());
93+
createTable(tablePath, builder.build());
94+
Table table = getTable(tablePath);
95+
96+
List<InternalRow> records =
97+
Arrays.asList(
98+
GenericRow.of(1, BinaryString.fromString("msg1")),
99+
GenericRow.of(2, BinaryString.fromString("msg2")));
100+
writeRecord(tablePath, records);
101+
102+
Snapshot snapshot = table.latestSnapshot().get();
103+
104+
LakeSource<PaimonSplit> lakeSource = lakeStorage.createLakeSource(tablePath);
105+
List<PaimonSplit> paimonSplits = lakeSource.createPlanner(snapshot::id).plan();
106+
107+
assertThat(paimonSplits).isNotEmpty();
108+
for (PaimonSplit split : paimonSplits) {
109+
assertThat(split.isBucketUnAware()).isTrue();
110+
assertThat(split.bucket()).isEqualTo(-1);
111+
}
112+
}
81113
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSplitSerializerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ void testSerializeAndDeserialize() throws Exception {
7272
PaimonSplit deserialized = serializer.deserialize(serializer.getVersion(), serialized);
7373

7474
assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit());
75+
assertThat(deserialized.isBucketUnAware()).isEqualTo(originalPaimonSplit.isBucketUnAware());
7576
}
7677

7778
@Test

0 commit comments

Comments
 (0)