Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.flink.tiering.committer;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
Expand All @@ -37,6 +38,10 @@
/** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */
public class FlussTableLakeSnapshotCommitter implements AutoCloseable {

// current version for commit lake snapshot is 2,
// coordinator should use v2 to serialize lake snapshot
private static final int CURRENT_VERSION_FOR_COMMIT_LAKE_SNAPSHOT = 2;

private final Configuration flussConf;

private CoordinatorGateway coordinatorGateway;
Expand All @@ -63,6 +68,7 @@ void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException {
try {
CommitLakeTableSnapshotRequest request =
toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
request.setLakeSnapshotSerializationVersion(CURRENT_VERSION_FOR_COMMIT_LAKE_SNAPSHOT);
coordinatorGateway.commitLakeTableSnapshot(request).get();
} catch (Exception e) {
throw new IOException(
Expand All @@ -84,7 +90,8 @@ public void commit(long tableId, long snapshotId, Map<TableBucket, Long> logEndO
commit(flussTableLakeSnapshot);
}

private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
@VisibleForTesting
static CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
FlussTableLakeSnapshot flussTableLakeSnapshot) {
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
new CommitLakeTableSnapshotRequest();
Expand All @@ -106,6 +113,11 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
return commitLakeTableSnapshotRequest;
}

@VisibleForTesting
CoordinatorGateway getCoordinatorGateway() {
return coordinatorGateway;
}

@Override
public void close() throws Exception {
if (rpcClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.ZkData;
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshotJsonSerde;
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.fluss.utils.types.Tuple2;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.fluss.flink.tiering.committer.FlussTableLakeSnapshotCommitter.toCommitLakeTableSnapshotRequest;
import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -61,44 +68,94 @@ void afterEach() throws Exception {
void testCommit(boolean isPartitioned) throws Exception {
TablePath tablePath =
TablePath.of("fluss", "test_commit" + (isPartitioned ? "_partitioned" : ""));
long tableId =
createTable(
tablePath,
isPartitioned
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
: DATA1_TABLE_DESCRIPTOR);
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, isPartitioned);
long tableId = tableIdAndPartitions.f0;
Collection<Long> partitions = tableIdAndPartitions.f1;

List<String> partitions;
Map<String, Long> partitionNameAndIds = new HashMap<>();
if (!isPartitioned) {
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
partitions = Collections.singletonList(null);
} else {
partitionNameAndIds = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
partitions = new ArrayList<>(partitionNameAndIds.keySet());
Map<TableBucket, Long> logEndOffsets = mockLogEndOffsets(tableId, partitions);

long snapshotId = 3;
// commit offsets
flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets);
LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get();
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);

// get and check the offsets
Map<TableBucket, Long> bucketLogOffsets = lakeSnapshot.getTableBucketsOffset();
assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCompatibilityWithoutSerializationVersion(boolean isPartitioned) throws Exception {
TablePath tablePath =
TablePath.of(
"fluss",
"test_legacy_version_commit" + (isPartitioned ? "_partitioned" : ""));
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, isPartitioned);
long tableId = tableIdAndPartitions.f0;
Collection<Long> partitions = tableIdAndPartitions.f1;

Map<TableBucket, Long> logEndOffsets = mockLogEndOffsets(tableId, partitions);

long snapshotId = 3;
// commit offsets
FlussTableLakeSnapshot flussTableLakeSnapshot =
new FlussTableLakeSnapshot(tableId, snapshotId);
for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
flussTableLakeSnapshot.addBucketOffset(entry.getKey(), entry.getValue());
}

// not set commit lake snapshot version to mock old version behavior
flussTableLakeSnapshotCommitter
.getCoordinatorGateway()
.commitLakeTableSnapshot(toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot))
.get();

// test deserialize with old version deserializer
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
// read the the json node from lake table node
JsonNode jsonNode =
new ObjectMapper()
.readTree(zkClient.getOrEmpty(ZkData.LakeTableZNode.path(tableId)).get());
LakeTableSnapshot lakeTableSnapshot =
LakeTableSnapshotJsonSerde.INSTANCE.deserializeVersion1(jsonNode);

// verify the deserialized lakeTableSnapshot
assertThat(lakeTableSnapshot.getSnapshotId()).isEqualTo(3);
assertThat(lakeTableSnapshot.getBucketLogEndOffset()).isEqualTo(logEndOffsets);
}

private Map<TableBucket, Long> mockLogEndOffsets(long tableId, Collection<Long> partitionsIds) {
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
for (int bucket = 0; bucket < 3; bucket++) {
long bucketOffset = bucket * bucket;
for (String partitionName : partitions) {
if (partitionName == null) {
for (Long partitionId : partitionsIds) {
if (partitionId == null) {
logEndOffsets.put(new TableBucket(tableId, bucket), bucketOffset);
} else {
long partitionId = partitionNameAndIds.get(partitionName);
logEndOffsets.put(new TableBucket(tableId, partitionId, bucket), bucketOffset);
}
}
}
return logEndOffsets;
}

long snapshotId = 3;
// commit offsets
flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets);
LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get();
assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);

// get and check the offsets
Map<TableBucket, Long> bucketLogOffsets = lakeSnapshot.getTableBucketsOffset();
assertThat(bucketLogOffsets).isEqualTo(logEndOffsets);
private Tuple2<Long, Collection<Long>> createTable(TablePath tablePath, boolean isPartitioned)
throws Exception {
long tableId =
createTable(
tablePath,
isPartitioned
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
: DATA1_TABLE_DESCRIPTOR);
Collection<Long> partitions;
if (!isPartitioned) {
partitions = Collections.singletonList(null);
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
} else {
partitions = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath).values();
}
return new Tuple2<>(tableId, partitions);
}
}
13 changes: 13 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,19 @@ message NotifyRemoteLogOffsetsResponse {

message CommitLakeTableSnapshotRequest {
repeated PbLakeTableSnapshotInfo tables_req = 1;
// The version number for serializing lake_snapshot. This field tells the coordinator server
// which version to use when serializing the lake snapshot data.
//
// Legacy tiering services (before this field was introduced) do not set this field. This field
// is primarily used to handle compatibility during Fluss upgrades:
//
// - During upgrade: Fluss may use a new serialization format, but some tablet servers may not
// have been upgraded yet and cannot deserialize the new format. The coordinator server can
// check this field to determine which serialization format to use.
// - After upgrade: Once all Fluss components are upgraded, tiering services can be updated to
// use the new format. The coordinator server will recognize this field and use the new
// serialization method, and all tablet servers will be able to deserialize the new format.
optional int32 lake_snapshot_serialization_version = 2;
}

message PbLakeTableSnapshotInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1260,10 +1260,14 @@ private void tryProcessCommitLakeTableSnapshot(
+ tableId
+ " not found in coordinator context.");
}

// this involves IO operation (ZK), so we do it in ioExecutor
lakeTableHelper.upsertLakeTable(
tableId, tablePath, lakeTableSnapshotEntry.getValue());
if (commitLakeTableSnapshotData.getSerializationVersion() == null) {
lakeTableHelper.upsertLakeTableV1(
tableId, lakeTableSnapshotEntry.getValue());
} else {
// this involves IO operation (ZK), so we do it in ioExecutor
lakeTableHelper.upsertLakeTable(
tableId, tablePath, lakeTableSnapshotEntry.getValue());
}
} catch (Exception e) {
ApiError error = ApiError.fromThrowable(e);
tableResp.setError(error.error().code(), error.message());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Objects;

Expand All @@ -30,11 +32,17 @@ public class CommitLakeTableSnapshotData {
private final Map<Long, LakeTableSnapshot> lakeTableSnapshots;
private final Map<TableBucket, Long> tableBucketsMaxTieredTimestamp;

// the serialization version for lake table snapshot, will be null
// before 0.8
private final Integer serializationVersion;

public CommitLakeTableSnapshotData(
Map<Long, LakeTableSnapshot> lakeTableSnapshots,
Map<TableBucket, Long> tableBucketsMaxTieredTimestamp) {
Map<TableBucket, Long> tableBucketsMaxTieredTimestamp,
@Nullable Integer serializationVersion) {
this.lakeTableSnapshots = lakeTableSnapshots;
this.tableBucketsMaxTieredTimestamp = tableBucketsMaxTieredTimestamp;
this.serializationVersion = serializationVersion;
}

public Map<Long, LakeTableSnapshot> getLakeTableSnapshot() {
Expand All @@ -45,6 +53,10 @@ public Map<TableBucket, Long> getTableBucketsMaxTieredTimestamp() {
return tableBucketsMaxTieredTimestamp;
}

public Integer getSerializationVersion() {
return serializationVersion;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -56,12 +68,14 @@ public boolean equals(Object o) {
CommitLakeTableSnapshotData that = (CommitLakeTableSnapshotData) o;
return Objects.equals(lakeTableSnapshots, that.lakeTableSnapshots)
&& Objects.equals(
tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp);
tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp)
&& Objects.equals(serializationVersion, that.serializationVersion);
}

@Override
public int hashCode() {
return Objects.hash(lakeTableSnapshots, tableBucketsMaxTieredTimestamp);
return Objects.hash(
lakeTableSnapshots, tableBucketsMaxTieredTimestamp, serializationVersion);
}

@Override
Expand All @@ -71,6 +85,8 @@ public String toString() {
+ lakeTableSnapshots
+ ", tableBucketsMaxTieredTimestamp="
+ tableBucketsMaxTieredTimestamp
+ ", serializationVersion="
+ serializationVersion
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1574,7 +1574,13 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData(
lakeTableInfoByTableId.put(
tableId, new LakeTableSnapshot(snapshotId, bucketLogEndOffset));
}
return new CommitLakeTableSnapshotData(lakeTableInfoByTableId, tableBucketsMaxTimestamp);

Integer serializationVersion =
request.hasLakeSnapshotSerializationVersion()
? request.getLakeSnapshotSerializationVersion()
: null;
return new CommitLakeTableSnapshotData(
lakeTableInfoByTableId, tableBucketsMaxTimestamp, serializationVersion);
}

public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBucket(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,9 +1027,15 @@ public Optional<RemoteLogManifestHandle> getRemoteLogManifestHandle(TableBucket
}

/** Upsert the {@link LakeTable} to Zk Node. */
public void upsertLakeTable(long tableId, LakeTable lakeTable, boolean isUpdate)
public void upsertLakeTable(
long tableId, LakeTable lakeTable, boolean isUpdate, boolean isLegacyVersion)
throws Exception {
byte[] zkData = LakeTableZNode.encode(lakeTable);
byte[] zkData;
if (isLegacyVersion) {
zkData = LakeTableZNode.encodeV1(tableId, lakeTable);
} else {
zkData = LakeTableZNode.encode(lakeTable);
}
String zkPath = LakeTableZNode.path(tableId);
if (isUpdate) {
zkClient.setData().forPath(zkPath, zkData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

Expand Down Expand Up @@ -589,6 +590,14 @@ public static String path(long tableId) {
return TableIdZNode.path(tableId) + "/laketable";
}

/**
* Encodes a {@link LakeTable} to JSON bytes using Version 1 format (legacy) for storage in
* ZooKeeper.
*/
public static byte[] encodeV1(long tableId, LakeTable lakeTable) throws IOException {
return LakeTableJsonSerde.serializeV1(tableId, lakeTable);
}

/**
* Encodes a LakeTable to JSON bytes for storage in ZK.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public List<LakeSnapshotMetadata> getLakeSnapshotMetadatas() {
*
* @return the LakeTableSnapshot
*/
public LakeTableSnapshot getLatestTableSnapshot() throws Exception {
public LakeTableSnapshot getLatestTableSnapshot() throws IOException {
if (lakeTableSnapshot != null) {
return lakeTableSnapshot;
}
Expand Down
Loading