Skip to content

Commit 747e91b

Browse files
committed
minor fix
1 parent 69bf339 commit 747e91b

File tree

14 files changed

+774
-421
lines changed

14 files changed

+774
-421
lines changed

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -710,14 +710,14 @@ public static FsPath remoteLakeTableSnapshotDir(
710710
* <p>The path contract:
711711
*
712712
* <pre>
713-
* {$remoteLakeTableSnapshotMetadataDir}/metadata/{uuid}.manifest
713+
* {$remoteLakeTableSnapshotMetadataDir}/metadata/{UUID}.offsets
714714
* </pre>
715715
*/
716-
public static FsPath remoteLakeTableSnapshotManifestPath(
716+
public static FsPath remoteLakeTableSnapshotOffsetPath(
717717
String remoteDataDir, TablePath tablePath, long tableId) {
718718
return new FsPath(
719719
String.format(
720-
"%s/metadata/%s.manifest",
720+
"%s/metadata/%s.offsets",
721721
remoteLakeTableSnapshotDir(remoteDataDir, tablePath, tableId),
722722
UUID.randomUUID()));
723723
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.flink.tiering.committer;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.client.metadata.MetadataUpdater;
2122
import org.apache.fluss.config.ConfigOptions;
2223
import org.apache.fluss.config.Configuration;
@@ -228,30 +229,52 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
228229
// Add PbLakeTableSnapshotInfo for metrics reporting (to notify tablet servers about
229230
// synchronized log end offsets and max timestamps)
230231
if (!logEndOffsets.isEmpty()) {
231-
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
232-
commitLakeTableSnapshotRequest.addTablesReq();
233-
pbLakeTableSnapshotInfo.setTableId(tableId);
234-
pbLakeTableSnapshotInfo.setSnapshotId(snapshotId);
235-
for (Map.Entry<TableBucket, Long> logEndOffsetEntry : logEndOffsets.entrySet()) {
236-
TableBucket tableBucket = logEndOffsetEntry.getKey();
237-
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
238-
pbLakeTableSnapshotInfo.addBucketsReq();
232+
commitLakeTableSnapshotRequest =
233+
addLogEndOffsets(
234+
commitLakeTableSnapshotRequest,
235+
tableId,
236+
snapshotId,
237+
logEndOffsets,
238+
logMaxTieredTimestamps);
239+
}
240+
return commitLakeTableSnapshotRequest;
241+
}
242+
243+
@VisibleForTesting
244+
protected CommitLakeTableSnapshotRequest addLogEndOffsets(
245+
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest,
246+
long tableId,
247+
long snapshotId,
248+
Map<TableBucket, Long> logEndOffsets,
249+
Map<TableBucket, Long> logMaxTieredTimestamps) {
250+
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
251+
commitLakeTableSnapshotRequest.addTablesReq();
252+
pbLakeTableSnapshotInfo.setTableId(tableId);
253+
pbLakeTableSnapshotInfo.setSnapshotId(snapshotId);
254+
for (Map.Entry<TableBucket, Long> logEndOffsetEntry : logEndOffsets.entrySet()) {
255+
TableBucket tableBucket = logEndOffsetEntry.getKey();
256+
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
257+
pbLakeTableSnapshotInfo.addBucketsReq();
239258

240-
if (tableBucket.getPartitionId() != null) {
241-
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
242-
}
243-
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
244-
pbLakeTableOffsetForBucket.setLogEndOffset(logEndOffsetEntry.getValue());
259+
if (tableBucket.getPartitionId() != null) {
260+
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
261+
}
262+
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
263+
pbLakeTableOffsetForBucket.setLogEndOffset(logEndOffsetEntry.getValue());
245264

246-
Long maxTimestamp = logMaxTieredTimestamps.get(tableBucket);
247-
if (maxTimestamp != null) {
248-
pbLakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
249-
}
265+
Long maxTimestamp = logMaxTieredTimestamps.get(tableBucket);
266+
if (maxTimestamp != null) {
267+
pbLakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp);
250268
}
251269
}
252270
return commitLakeTableSnapshotRequest;
253271
}
254272

273+
@VisibleForTesting
274+
CoordinatorGateway getCoordinatorGateway() {
275+
return coordinatorGateway;
276+
}
277+
255278
@Override
256279
public void close() throws Exception {
257280
if (rpcClient != null) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java

Lines changed: 85 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,22 @@
2121
import org.apache.fluss.flink.utils.FlinkTestBase;
2222
import org.apache.fluss.metadata.TableBucket;
2323
import org.apache.fluss.metadata.TablePath;
24+
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
25+
import org.apache.fluss.server.zk.ZooKeeperClient;
26+
import org.apache.fluss.server.zk.data.ZkData;
27+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
28+
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshotJsonSerde;
29+
import org.apache.fluss.utils.json.JsonSerdeUtils;
30+
import org.apache.fluss.utils.types.Tuple2;
2431

2532
import org.junit.jupiter.api.AfterEach;
2633
import org.junit.jupiter.api.BeforeEach;
2734
import org.junit.jupiter.params.ParameterizedTest;
2835
import org.junit.jupiter.params.provider.ValueSource;
2936

30-
import java.util.ArrayList;
37+
import java.util.Collection;
3138
import java.util.Collections;
3239
import java.util.HashMap;
33-
import java.util.List;
3440
import java.util.Map;
3541

3642
import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
@@ -61,36 +67,11 @@ void afterEach() throws Exception {
6167
void testCommit(boolean isPartitioned) throws Exception {
6268
TablePath tablePath =
6369
TablePath.of("fluss", "test_commit" + (isPartitioned ? "_partitioned" : ""));
64-
long tableId =
65-
createTable(
66-
tablePath,
67-
isPartitioned
68-
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
69-
: DATA1_TABLE_DESCRIPTOR);
70-
71-
List<String> partitions;
72-
Map<String, Long> partitionNameAndIds = new HashMap<>();
73-
if (!isPartitioned) {
74-
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
75-
partitions = Collections.singletonList(null);
76-
} else {
77-
partitionNameAndIds = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
78-
partitions = new ArrayList<>(partitionNameAndIds.keySet());
79-
}
70+
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, isPartitioned);
71+
long tableId = tableIdAndPartitions.f0;
72+
Collection<Long> partitions = tableIdAndPartitions.f1;
8073

81-
Map<TableBucket, Long> expectedOffsets = new HashMap<>();
82-
for (int bucket = 0; bucket < 3; bucket++) {
83-
long bucketOffset = bucket * bucket;
84-
for (String partitionName : partitions) {
85-
if (partitionName == null) {
86-
expectedOffsets.put(new TableBucket(tableId, bucket), bucketOffset);
87-
} else {
88-
long partitionId = partitionNameAndIds.get(partitionName);
89-
expectedOffsets.put(
90-
new TableBucket(tableId, partitionId, bucket), bucketOffset);
91-
}
92-
}
93-
}
74+
Map<TableBucket, Long> expectedOffsets = mockLogEndOffsets(tableId, partitions);
9475

9576
long lakeSnapshotId = 3;
9677

@@ -112,4 +93,77 @@ void testCommit(boolean isPartitioned) throws Exception {
11293
Map<TableBucket, Long> bucketLogOffsets = lakeSnapshot.getTableBucketsOffset();
11394
assertThat(bucketLogOffsets).isEqualTo(expectedOffsets);
11495
}
96+
97+
@ParameterizedTest
98+
@ValueSource(booleans = {true, false})
99+
void testCompatibilityWithOldCommitter(boolean isPartitioned) throws Exception {
100+
// test commit lake snapshot with old behavior
101+
TablePath tablePath =
102+
TablePath.of(
103+
"fluss",
104+
"test_legacy_version_commit" + (isPartitioned ? "_partitioned" : ""));
105+
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, isPartitioned);
106+
long tableId = tableIdAndPartitions.f0;
107+
Collection<Long> partitions = tableIdAndPartitions.f1;
108+
109+
Map<TableBucket, Long> logEndOffsets = mockLogEndOffsets(tableId, partitions);
110+
long snapshotId = 3;
111+
112+
// mock old behavior to commit
113+
CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
114+
new CommitLakeTableSnapshotRequest();
115+
commitLakeTableSnapshotRequest =
116+
flussTableLakeSnapshotCommitter.addLogEndOffsets(
117+
commitLakeTableSnapshotRequest,
118+
tableId,
119+
snapshotId,
120+
logEndOffsets,
121+
Collections.emptyMap());
122+
flussTableLakeSnapshotCommitter
123+
.getCoordinatorGateway()
124+
.commitLakeTableSnapshot(commitLakeTableSnapshotRequest)
125+
.get();
126+
127+
// make sure it can be deserialized with v1
128+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
129+
byte[] jsonBytes = zkClient.getOrEmpty(ZkData.LakeTableZNode.path(tableId)).get();
130+
131+
LakeTableSnapshot lakeTableSnapshot =
132+
JsonSerdeUtils.readValue(jsonBytes, LakeTableSnapshotJsonSerde.INSTANCE);
133+
assertThat(lakeTableSnapshot.getSnapshotId()).isEqualTo(snapshotId);
134+
assertThat(lakeTableSnapshot.getBucketLogEndOffset()).isEqualTo(logEndOffsets);
135+
}
136+
137+
private Map<TableBucket, Long> mockLogEndOffsets(long tableId, Collection<Long> partitionsIds) {
138+
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
139+
for (int bucket = 0; bucket < 3; bucket++) {
140+
long bucketOffset = bucket * bucket;
141+
for (Long partitionId : partitionsIds) {
142+
if (partitionId == null) {
143+
logEndOffsets.put(new TableBucket(tableId, bucket), bucketOffset);
144+
} else {
145+
logEndOffsets.put(new TableBucket(tableId, partitionId, bucket), bucketOffset);
146+
}
147+
}
148+
}
149+
return logEndOffsets;
150+
}
151+
152+
private Tuple2<Long, Collection<Long>> createTable(TablePath tablePath, boolean isPartitioned)
153+
throws Exception {
154+
long tableId =
155+
createTable(
156+
tablePath,
157+
isPartitioned
158+
? DATA1_PARTITIONED_TABLE_DESCRIPTOR
159+
: DATA1_TABLE_DESCRIPTOR);
160+
Collection<Long> partitions;
161+
if (!isPartitioned) {
162+
partitions = Collections.singletonList(null);
163+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
164+
} else {
165+
partitions = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath).values();
166+
}
167+
return new Tuple2<>(tableId, partitions);
168+
}
115169
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,8 +1305,8 @@ private void handleCommitLakeTableSnapshotV1(
13051305
}
13061306

13071307
// this involves IO operation (ZK), so we do it in ioExecutor
1308-
lakeTableHelper.upsertLakeTable(
1309-
tableId, tablePath, lakeTableSnapshotEntry.getValue());
1308+
lakeTableHelper.upsertLakeTableV1(
1309+
tableId, lakeTableSnapshotEntry.getValue());
13101310
} catch (Exception e) {
13111311
ApiError error = ApiError.fromThrowable(e);
13121312
tableResp.setError(error.error().code(), error.message());
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.zk.data;
20+
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Map;
24+
import java.util.Objects;
25+
26+
/**
27+
* Represents the offsets for all buckets of a table. This class stores the mapping from {@link
28+
* TableBucket} to their corresponding offsets.
29+
*
30+
* <p>This class is used to track the log end offsets for each bucket in a table. It supports both
31+
* non-partitioned tables (where buckets are identified only by bucket id) and partitioned tables
32+
* (where buckets are identified by partition id and bucket id).
33+
*
34+
* <p>The offsets map contains entries for each bucket that has a valid offset. Missing buckets are
35+
* not included in the map.
36+
*
37+
* @see TableBucketOffsetsJsonSerde for JSON serialization and deserialization.
38+
*/
39+
public class TableBucketOffsets {
40+
41+
/** The table ID that all buckets belong to. */
42+
private final long tableId;
43+
44+
/**
45+
* The mapping from {@link TableBucket} to their offsets. The map contains entries only for
46+
* buckets that have valid offsets.
47+
*/
48+
private final Map<TableBucket, Long> offsets;
49+
50+
/**
51+
* Creates a new {@link TableBucketOffsets} instance.
52+
*
53+
* @param tableId the table ID that all buckets belong to
54+
* @param offsets the mapping from {@link TableBucket} to their offsets
55+
*/
56+
public TableBucketOffsets(long tableId, Map<TableBucket, Long> offsets) {
57+
this.tableId = tableId;
58+
this.offsets = offsets;
59+
}
60+
61+
/**
62+
* Returns the table ID that all buckets belong to.
63+
*
64+
* @return the table ID
65+
*/
66+
public long getTableId() {
67+
return tableId;
68+
}
69+
70+
/**
71+
* Returns the mapping from {@link TableBucket} to their offsets.
72+
*
73+
* @return the offsets map
74+
*/
75+
public Map<TableBucket, Long> getOffsets() {
76+
return offsets;
77+
}
78+
79+
@Override
80+
public boolean equals(Object o) {
81+
if (this == o) {
82+
return true;
83+
}
84+
if (o == null || getClass() != o.getClass()) {
85+
return false;
86+
}
87+
TableBucketOffsets that = (TableBucketOffsets) o;
88+
return tableId == that.tableId && Objects.equals(offsets, that.offsets);
89+
}
90+
91+
@Override
92+
public int hashCode() {
93+
return Objects.hash(tableId, offsets);
94+
}
95+
96+
@Override
97+
public String toString() {
98+
return "TableBucketOffsets{" + "tableId=" + tableId + ", offsets=" + offsets + '}';
99+
}
100+
}

0 commit comments

Comments
 (0)