Skip to content

Commit feecfc7

Browse files
committed
[lake] Fix zk lake snapshot node compatible issue
1 parent de2d9a0 commit feecfc7

File tree

13 files changed

+197
-63
lines changed

13 files changed

+197
-63
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.flink.tiering.committer;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.client.metadata.MetadataUpdater;
2122
import org.apache.fluss.config.ConfigOptions;
2223
import org.apache.fluss.config.Configuration;
@@ -37,6 +38,10 @@
3738
/** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
3839
public class FlussTableLakeSnapshotCommitter implements AutoCloseable {
3940

41+
// current version for commit lake snapshot is 2,
42+
// coordinator should use v2 to serialize lake snapshot
43+
private static final int CURRENT_VERSION_FOR_COMMIT_LAKE_SNAPSHOT = 2;
44+
4045
private final Configuration flussConf;
4146

4247
private CoordinatorGateway coordinatorGateway;
@@ -63,6 +68,7 @@ void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
6368
try {
6469
CommitLakeTableSnapshotRequest request =
6570
toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
71+
request.setLakeSnapshotSerializationVersion(CURRENT_VERSION_FOR_COMMIT_LAKE_SNAPSHOT);
6672
coordinatorGateway.commitLakeTableSnapshot(request).get();
6773
} catch (Exception e) {
6874
throw new IOException(
@@ -84,7 +90,8 @@ public void commit(long tableId, long snapshotId, Map<TableBucket, Long> logEndO
8490
commit(flussTableLakeSnapshot);
8591
}
8692

87-
private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
93+
@VisibleForTesting
94+
static CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
8895
FlussTableLakeSnapshot flussTableLakeSnapshot) {
8996
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
9097
new CommitLakeTableSnapshotRequest();
@@ -106,6 +113,11 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
106113
return commitLakeTableSnapshotRequest;
107114
}
108115

116+
@VisibleForTesting
117+
CoordinatorGateway getCoordinatorGateway() {
118+
return coordinatorGateway;
119+
}
120+
109121
@Override
110122
public void close() throws Exception {
111123
if (rpcClient != null) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java

Lines changed: 85 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,24 @@
2121
import org.apache.fluss.flink.utils.FlinkTestBase;
2222
import org.apache.fluss.metadata.TableBucket;
2323
import org.apache.fluss.metadata.TablePath;
24-
24+
import org.apache.fluss.server.zk.ZooKeeperClient;
25+
import org.apache.fluss.server.zk.data.ZkData;
26+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
27+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshotJsonSerde;
28+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
29+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
30+
import org.apache.fluss.utils.types.Tuple2;
2531
import org.junit.jupiter.api.AfterEach;
2632
import org.junit.jupiter.api.BeforeEach;
2733
import org.junit.jupiter.params.ParameterizedTest;
2834
import org.junit.jupiter.params.provider.ValueSource;
2935

30-
import java.util.ArrayList;
36+
import java.util.Collection;
3137
import java.util.Collections;
3238
import java.util.HashMap;
33-
import java.util.List;
3439
import java.util.Map;
3540

41+
import static org.apache.fluss.flink.tiering.committer.FlussTableLakeSnapshotCommitter.toCommitLakeTableSnapshotRequest;
3642
import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
3743
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
3844
import static org.assertj.core.api.Assertions.assertThat;
@@ -61,44 +67,94 @@ void afterEach() throws Exception {
6167
void testCommit(boolean isPartitioned) throws Exception {
6268
TablePath tablePath =
6369
TablePath.of("fluss", "test_commit" + (isPartitioned ? "_partitioned" : ""));
64-
long tableId =
65-
createTable(
66-
tablePath,
67-
isPartitioned
68-
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
69-
: DATA1_TABLE_DESCRIPTOR);
70+
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, isPartitioned);
71+
long tableId = tableIdAndPartitions.f0;
72+
Collection<Long> partitions = tableIdAndPartitions.f1;
7073

71-
List<String> partitions;
72-
Map<String, Long> partitionNameAndIds = new HashMap<>();
73-
if (!isPartitioned) {
74-
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
75-
partitions = Collections.singletonList(null);
76-
} else {
77-
partitionNameAndIds = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
78-
partitions = new ArrayList<>(partitionNameAndIds.keySet());
74+
Map<TableBucket, Long> logEndOffsets = mockLogEndOffsets(tableId, partitions);
75+
76+
long snapshotId = 3;
77+
// commit offsets
78+
flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets);
79+
LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get();
80+
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
81+
82+
// get and check the offsets
83+
Map<TableBucket, Long> bucketLogOffsets = lakeSnapshot.getTableBucketsOffset();
84+
assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
85+
}
86+
87+
@ParameterizedTest
88+
@ValueSource(booleans = {true, false})
89+
void testCompatibilityWithoutSerializationVersion(boolean isPartitioned) throws Exception {
90+
TablePath tablePath =
91+
TablePath.of(
92+
"fluss",
93+
"test_lagacy_version_commit" + (isPartitioned ? "_partitioned" : ""));
94+
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, isPartitioned);
95+
long tableId = tableIdAndPartitions.f0;
96+
Collection<Long> partitions = tableIdAndPartitions.f1;
97+
98+
Map<TableBucket, Long> logEndOffsets = mockLogEndOffsets(tableId, partitions);
99+
100+
long snapshotId = 3;
101+
// commit offsets
102+
FlussTableLakeSnapshot flussTableLakeSnapshot =
103+
new FlussTableLakeSnapshot(tableId, snapshotId);
104+
for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
105+
flussTableLakeSnapshot.addBucketOffset(entry.getKey(), entry.getValue());
79106
}
80107

108+
// not set commit lake snapshot version to mock old version behavior
109+
flussTableLakeSnapshotCommitter
110+
.getCoordinatorGateway()
111+
.commitLakeTableSnapshot(toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot))
112+
.get();
113+
114+
// test deserialize with old version deserializer
115+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
116+
// read the the json node from lake table node
117+
JsonNode jsonNode =
118+
new ObjectMapper()
119+
.readTree(zkClient.getOrEmpty(ZkData.LakeTableZNode.path(tableId)).get());
120+
LakeTableSnapshot lakeTableSnapshot =
121+
LakeTableSnapshotJsonSerde.INSTANCE.deserializeVersion1(jsonNode);
122+
123+
// verify the deserialized lakeTableSnapshot
124+
assertThat(lakeTableSnapshot.getSnapshotId()).isEqualTo(3);
125+
assertThat(lakeTableSnapshot.getBucketLogEndOffset()).isEqualTo(logEndOffsets);
126+
}
127+
128+
private Map<TableBucket, Long> mockLogEndOffsets(long tableId, Collection<Long> partitionsIds) {
81129
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
82130
for (int bucket = 0; bucket < 3; bucket++) {
83131
long bucketOffset = bucket * bucket;
84-
for (String partitionName : partitions) {
85-
if (partitionName == null) {
132+
for (Long partitionId : partitionsIds) {
133+
if (partitionId == null) {
86134
logEndOffsets.put(new TableBucket(tableId, bucket), bucketOffset);
87135
} else {
88-
long partitionId = partitionNameAndIds.get(partitionName);
89136
logEndOffsets.put(new TableBucket(tableId, partitionId, bucket), bucketOffset);
90137
}
91138
}
92139
}
140+
return logEndOffsets;
141+
}
93142

94-
long snapshotId = 3;
95-
// commit offsets
96-
flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets);
97-
LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get();
98-
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
99-
100-
// get and check the offsets
101-
Map<TableBucket, Long> bucketLogOffsets = lakeSnapshot.getTableBucketsOffset();
102-
assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
143+
private Tuple2<Long, Collection<Long>> createTable(TablePath tablePath, boolean isPartitioned)
144+
throws Exception {
145+
long tableId =
146+
createTable(
147+
tablePath,
148+
isPartitioned
149+
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
150+
: DATA1_TABLE_DESCRIPTOR);
151+
Collection<Long> partitions;
152+
if (!isPartitioned) {
153+
partitions = Collections.singletonList(null);
154+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
155+
} else {
156+
partitions = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath).values();
157+
}
158+
return new Tuple2<>(tableId, partitions);
103159
}
104160
}

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,19 @@ message NotifyRemoteLogOffsetsResponse {
451451

452452
message CommitLakeTableSnapshotRequest {
453453
repeated PbLakeTableSnapshotInfo tables_req = 1;
454+
// The version number for serializing lake_snapshot. This field tells the coordinator server
455+
// which version to use when serializing the lake snapshot data.
456+
//
457+
// Legacy tiering services (before this field was introduced) do not set this field. This field
458+
// is primarily used to handle compatibility during Fluss upgrades:
459+
//
460+
// - During upgrade: Fluss may use a new serialization format, but some tablet servers may not
461+
// have been upgraded yet and cannot deserialize the new format. The coordinator server can
462+
// check this field to determine which serialization format to use.
463+
// - After upgrade: Once all Fluss components are upgraded, tiering services can be updated to
464+
// use the new format. The coordinator server will recognize this field and use the new
465+
// serialization method, and all tablet servers will be able to deserialize the new format.
466+
optional int32 lake_snapshot_serialization_version = 2;
454467
}
455468

456469
message PbLakeTableSnapshotInfo {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,11 @@
9393
import org.apache.fluss.server.zk.data.lake.LakeTableHelper;
9494
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
9595
import org.apache.fluss.utils.types.Tuple2;
96-
9796
import org.slf4j.Logger;
9897
import org.slf4j.LoggerFactory;
9998

10099
import javax.annotation.Nullable;
101100
import javax.annotation.concurrent.NotThreadSafe;
102-
103101
import java.util.ArrayList;
104102
import java.util.Collections;
105103
import java.util.HashMap;
@@ -1260,10 +1258,14 @@ private void tryProcessCommitLakeTableSnapshot(
12601258
+ tableId
12611259
+ " not found in coordinator context.");
12621260
}
1263-
1264-
// this involves IO operation (ZK), so we do it in ioExecutor
1265-
lakeTableHelper.upsertLakeTable(
1266-
tableId, tablePath, lakeTableSnapshotEntry.getValue());
1261+
if (commitLakeTableSnapshotData.getSerializationVersion() == null) {
1262+
lakeTableHelper.upsertLakeTableV1(
1263+
tableId, lakeTableSnapshotEntry.getValue());
1264+
} else {
1265+
// this involves IO operation (ZK), so we do it in ioExecutor
1266+
lakeTableHelper.upsertLakeTable(
1267+
tableId, tablePath, lakeTableSnapshotEntry.getValue());
1268+
}
12671269
} catch (Exception e) {
12681270
ApiError error = ApiError.fromThrowable(e);
12691271
tableResp.setError(error.error().code(), error.message());

fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
2222
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
2323

24+
import javax.annotation.Nullable;
2425
import java.util.Map;
2526
import java.util.Objects;
2627

@@ -30,11 +31,17 @@ public class CommitLakeTableSnapshotData {
3031
private final Map<Long, LakeTableSnapshot> lakeTableSnapshots;
3132
private final Map<TableBucket, Long> tableBucketsMaxTieredTimestamp;
3233

34+
// the serialization version for lake table snapshot, will be null
35+
// before 0.8
36+
private final Integer serializationVersion;
37+
3338
public CommitLakeTableSnapshotData(
3439
Map<Long, LakeTableSnapshot> lakeTableSnapshots,
35-
Map<TableBucket, Long> tableBucketsMaxTieredTimestamp) {
40+
Map<TableBucket, Long> tableBucketsMaxTieredTimestamp,
41+
@Nullable Integer serializationVersion) {
3642
this.lakeTableSnapshots = lakeTableSnapshots;
3743
this.tableBucketsMaxTieredTimestamp = tableBucketsMaxTieredTimestamp;
44+
this.serializationVersion = serializationVersion;
3845
}
3946

4047
public Map<Long, LakeTableSnapshot> getLakeTableSnapshot() {
@@ -45,6 +52,10 @@ public Map<TableBucket, Long> getTableBucketsMaxTieredTimestamp() {
4552
return tableBucketsMaxTieredTimestamp;
4653
}
4754

55+
public Integer getSerializationVersion() {
56+
return serializationVersion;
57+
}
58+
4859
@Override
4960
public boolean equals(Object o) {
5061
if (this == o) {
@@ -56,12 +67,14 @@ public boolean equals(Object o) {
5667
CommitLakeTableSnapshotData that = (CommitLakeTableSnapshotData) o;
5768
return Objects.equals(lakeTableSnapshots, that.lakeTableSnapshots)
5869
&& Objects.equals(
59-
tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp);
70+
tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp)
71+
&& Objects.equals(serializationVersion, that.serializationVersion);
6072
}
6173

6274
@Override
6375
public int hashCode() {
64-
return Objects.hash(lakeTableSnapshots, tableBucketsMaxTieredTimestamp);
76+
return Objects.hash(
77+
lakeTableSnapshots, tableBucketsMaxTieredTimestamp, serializationVersion);
6578
}
6679

6780
@Override
@@ -71,6 +84,8 @@ public String toString() {
7184
+ lakeTableSnapshots
7285
+ ", tableBucketsMaxTieredTimestamp="
7386
+ tableBucketsMaxTieredTimestamp
87+
+ ", serializationVersion="
88+
+ serializationVersion
7489
+ '}';
7590
}
7691
}

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,6 @@
170170
import org.apache.fluss.utils.json.JsonSerdeUtils;
171171

172172
import javax.annotation.Nullable;
173-
174173
import java.nio.ByteBuffer;
175174
import java.util.ArrayList;
176175
import java.util.Arrays;
@@ -1574,7 +1573,13 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
15741573
lakeTableInfoByTableId.put(
15751574
tableId, new LakeTableSnapshot(snapshotId, bucketLogEndOffset));
15761575
}
1577-
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId, tableBucketsMaxTimestamp);
1576+
1577+
Integer serializationVersion =
1578+
request.hasLakeSnapshotSerializationVersion()
1579+
? request.getLakeSnapshotSerializationVersion()
1580+
: null;
1581+
return new CommitLakeTableSnapshotData(
1582+
lakeTableInfoByTableId, tableBucketsMaxTimestamp, serializationVersion);
15781583
}
15791584

15801585
public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBucket(

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,11 @@
8484
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
8585
import org.apache.fluss.utils.ExceptionUtils;
8686
import org.apache.fluss.utils.types.Tuple2;
87-
8887
import org.slf4j.Logger;
8988
import org.slf4j.LoggerFactory;
9089

9190
import javax.annotation.Nonnull;
9291
import javax.annotation.Nullable;
93-
9492
import java.util.ArrayList;
9593
import java.util.Arrays;
9694
import java.util.Collection;
@@ -1027,9 +1025,15 @@ public Optional<RemoteLogManifestHandle> getRemoteLogManifestHandle(TableBucket
10271025
}
10281026

10291027
/** Upsert the {@link LakeTable} to Zk Node. */
1030-
public void upsertLakeTable(long tableId, LakeTable lakeTable, boolean isUpdate)
1028+
public void upsertLakeTable(
1029+
long tableId, LakeTable lakeTable, boolean isUpdate, boolean isLegacyVersion)
10311030
throws Exception {
1032-
byte[] zkData = LakeTableZNode.encode(lakeTable);
1031+
byte[] zkData;
1032+
if (isLegacyVersion) {
1033+
zkData = LakeTableZNode.encodeV1(tableId, lakeTable);
1034+
} else {
1035+
zkData = LakeTableZNode.encode(lakeTable);
1036+
}
10331037
String zkPath = LakeTableZNode.path(tableId);
10341038
if (isUpdate) {
10351039
zkClient.setData().forPath(zkPath, zkData);

fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.fluss.utils.types.Tuple2;
3131

3232
import javax.annotation.Nullable;
33-
33+
import java.io.IOException;
3434
import java.nio.charset.StandardCharsets;
3535
import java.util.Map;
3636

@@ -589,6 +589,14 @@ public static String path(long tableId) {
589589
return TableIdZNode.path(tableId) + "/laketable";
590590
}
591591

592+
/**
593+
* Encodes a {@link LakeTable} to JSON bytes using Version 1 format (legacy) for storage in
594+
* ZooKeeper.
595+
*/
596+
public static byte[] encodeV1(long tableId, LakeTable lakeTable) throws IOException {
597+
return LakeTableJsonSerde.serializeV1(tableId, lakeTable);
598+
}
599+
592600
/**
593601
* Encodes a LakeTable to JSON bytes for storage in ZK.
594602
*

0 commit comments

Comments
 (0)