Skip to content

Commit baa53d9

Browse files
authored
[lake] LakeSourceSplit should be backcompatible (#1703)
1 parent 164542f commit baa53d9

File tree

3 files changed

+70
-14
lines changed

3 files changed

+70
-14
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public LakeSplitSerializer(SimpleVersionedSerializer<LakeSplit> sourceSplitSeria
4747
}
4848

4949
public void serialize(DataOutputSerializer out, SourceSplitBase split) throws IOException {
50+
out.writeInt(sourceSplitSerializer.getVersion());
5051
if (split instanceof LakeSnapshotSplit) {
5152
LakeSnapshotSplit lakeSplit = (LakeSnapshotSplit) split;
5253
out.writeInt(lakeSplit.getSplitIndex());
@@ -91,13 +92,12 @@ public SourceSplitBase deserialize(
9192
@Nullable String partition,
9293
DataInputDeserializer input)
9394
throws IOException {
95+
int version = input.readInt();
9496
if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
9597
int splitIndex = input.readInt();
9698
byte[] serializeBytes = new byte[input.readInt()];
9799
input.read(serializeBytes);
98-
LakeSplit lakeSplit =
99-
sourceSplitSerializer.deserialize(
100-
sourceSplitSerializer.getVersion(), serializeBytes);
100+
LakeSplit lakeSplit = sourceSplitSerializer.deserialize(version, serializeBytes);
101101
return new LakeSnapshotSplit(tableBucket, partition, lakeSplit, splitIndex);
102102
} else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
103103
List<LakeSplit> lakeSplits = null;
@@ -107,9 +107,7 @@ public SourceSplitBase deserialize(
107107
for (int i = 0; i < lakeSplitSize; i++) {
108108
byte[] serializeBytes = new byte[input.readInt()];
109109
input.read(serializeBytes);
110-
lakeSplits.add(
111-
sourceSplitSerializer.deserialize(
112-
sourceSplitSerializer.getVersion(), serializeBytes));
110+
lakeSplits.add(sourceSplitSerializer.deserialize(version, serializeBytes));
113111
}
114112
}
115113
long startingOffset = input.readLong();

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,12 @@ private void serializeRemainingHybridLakeFlussSplits(
142142
out.writeBoolean(true);
143143
out.writeInt(remainingHybridLakeFlussSplits.size());
144144
SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
145+
out.writeInt(sourceSplitSerializer.getVersion());
145146
for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
146147
byte[] serializeBytes = sourceSplitSerializer.serialize(split);
147148
out.writeInt(serializeBytes.length);
148149
out.write(serializeBytes);
149150
}
150-
151151
} else {
152152
// write that hybrid lake fluss splits is null
153153
out.writeBoolean(false);
@@ -161,13 +161,12 @@ private List<SourceSplitBase> deserializeRemainingHybridLakeFlussSplits(
161161
int numSplits = in.readInt();
162162
List<SourceSplitBase> splits = new ArrayList<>(numSplits);
163163
SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
164+
int version = in.readInt();
164165
for (int i = 0; i < numSplits; i++) {
165166
int splitSizeInBytes = in.readInt();
166167
byte[] splitBytes = new byte[splitSizeInBytes];
167168
in.readFully(splitBytes);
168-
splits.add(
169-
sourceSplitSerializer.deserialize(
170-
sourceSplitSerializer.getVersion(), splitBytes));
169+
splits.add(sourceSplitSerializer.deserialize(version, splitBytes));
171170
}
172171
return splits;
173172
} else {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
class LakeSplitSerializerTest {
4343
private static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1;
4444

45-
private static final int SERIALIZER_VERSION = 3;
46-
4745
private static final byte[] TEST_DATA = "test-lake-split".getBytes();
4846

4947
private static final int STOPPING_OFFSET = 1024;
@@ -61,8 +59,9 @@ class LakeSplitSerializerTest {
6159
@Test
6260
void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {
6361
// Prepare test data
62+
int splitIndex = 1;
6463
LakeSnapshotSplit originalSplit =
65-
new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT, 1);
64+
new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT, splitIndex);
6665

6766
DataOutputSerializer output = new DataOutputSerializer(STOPPING_OFFSET);
6867
serializer.serialize(output, originalSplit);
@@ -80,6 +79,40 @@ void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {
8079
assertThat(tableBucket).isEqualTo(result.getTableBucket());
8180
assertThat("2025-08-18").isEqualTo(result.getPartitionName());
8281
assertThat(LAKE_SPLIT).isEqualTo(result.getLakeSplit());
82+
assertThat(splitIndex).isEqualTo(result.getSplitIndex());
83+
}
84+
85+
@Test
86+
void testSerializeAndDeserializeLakeSnapshotSplitBackwardCompatibility() throws IOException {
87+
SimpleVersionedSerializer<LakeSplit> sourceSplitSerializerV1 =
88+
new TestSimpleVersionedSerializer();
89+
SimpleVersionedSerializer<LakeSplit> sourceSplitSerializerV2 =
90+
new TestSimpleVersionedSerializerV2();
91+
LakeSplitSerializer serializerV1 = new LakeSplitSerializer(sourceSplitSerializerV1);
92+
LakeSplitSerializer serializerV2 = new LakeSplitSerializer(sourceSplitSerializerV2);
93+
94+
// Prepare test data
95+
int splitIndex = 1;
96+
LakeSnapshotSplit originalSplit =
97+
new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT, splitIndex);
98+
99+
DataOutputSerializer output = new DataOutputSerializer(STOPPING_OFFSET);
100+
serializerV1.serialize(output, originalSplit);
101+
102+
SourceSplitBase deserializedSplit =
103+
serializerV2.deserialize(
104+
LAKE_SNAPSHOT_SPLIT_KIND,
105+
tableBucket,
106+
"2025-08-18",
107+
new DataInputDeserializer(output.getCopyOfBuffer()));
108+
109+
assertThat(deserializedSplit instanceof LakeSnapshotSplit).isTrue();
110+
LakeSnapshotSplit result = (LakeSnapshotSplit) deserializedSplit;
111+
112+
assertThat(tableBucket).isEqualTo(result.getTableBucket());
113+
assertThat("2025-08-18").isEqualTo(result.getPartitionName());
114+
assertThat(LAKE_SPLIT).isEqualTo(result.getLakeSplit());
115+
assertThat(splitIndex).isEqualTo(result.getSplitIndex());
83116
}
84117

85118
@Test
@@ -137,6 +170,8 @@ void testDeserializeWithWrongSplitKind() throws IOException {
137170
private static class TestSimpleVersionedSerializer
138171
implements SimpleVersionedSerializer<LakeSplit> {
139172

173+
private static final int V1 = 1;
174+
140175
@Override
141176
public byte[] serialize(LakeSplit split) throws IOException {
142177
return TEST_DATA;
@@ -149,7 +184,31 @@ public LakeSplit deserialize(int version, byte[] serialized) throws IOException
149184

150185
@Override
151186
public int getVersion() {
152-
return SERIALIZER_VERSION;
187+
return V1;
188+
}
189+
}
190+
191+
private static class TestSimpleVersionedSerializerV2
192+
implements SimpleVersionedSerializer<LakeSplit> {
193+
194+
private static final int V2 = 2;
195+
196+
@Override
197+
public byte[] serialize(LakeSplit split) throws IOException {
198+
return TEST_DATA;
199+
}
200+
201+
@Override
202+
public LakeSplit deserialize(int version, byte[] serialized) throws IOException {
203+
if (version < V2) {
204+
return LAKE_SPLIT;
205+
}
206+
return new TestLakeSplit(0, Collections.singletonList("2025-08-19"));
207+
}
208+
209+
@Override
210+
public int getVersion() {
211+
return V2;
153212
}
154213
}
155214

0 commit comments

Comments
 (0)