Skip to content

Commit 3249ea4

Browse files
committed
address baiye's fip comments
1 parent 8d7f948 commit 3249ea4

File tree

25 files changed

+384
-300
lines changed

25 files changed

+384
-300
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
24+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2425
import org.apache.fluss.cluster.ServerNode;
2526
import org.apache.fluss.config.ConfigOptions;
2627
import org.apache.fluss.config.cluster.AlterConfig;
@@ -419,8 +420,9 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
419420
*
420421
* @param consumerId the consumer id.
421422
* @param consumeBuckets the tableBuckets to consume, a map from TableBucket to kvSnapshotId.
423+
* @return the result of registering kv snapshot consumer.
422424
*/
423-
CompletableFuture<Void> registerKvSnapshotConsumer(
425+
CompletableFuture<RegisterKvSnapshotResult> registerKvSnapshotConsumer(
424426
String consumerId, Map<TableBucket, Long> consumeBuckets);
425427

426428
/**

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.metadata.KvSnapshots;
2222
import org.apache.fluss.client.metadata.LakeSnapshot;
2323
import org.apache.fluss.client.metadata.MetadataUpdater;
24+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2425
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2526
import org.apache.fluss.cluster.Cluster;
2627
import org.apache.fluss.cluster.ServerNode;
@@ -387,7 +388,7 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
387388
}
388389

389390
@Override
390-
public CompletableFuture<Void> registerKvSnapshotConsumer(
391+
public CompletableFuture<RegisterKvSnapshotResult> registerKvSnapshotConsumer(
391392
String consumerId, Map<TableBucket, Long> consumeBuckets) {
392393
if (consumeBuckets.isEmpty()) {
393394
throw new IllegalArgumentException("consumeBuckets is empty");
@@ -400,7 +401,7 @@ public CompletableFuture<Void> registerKvSnapshotConsumer(
400401
return gateway.registerKvSnapshotConsumer(
401402
makeRegisterKvSnapshotConsumerRequest(
402403
consumerId, consumeBuckets, expirationTime))
403-
.thenApply(r -> null);
404+
.thenApply(ClientRpcMessageUtils::toRegisterKvSnapshotResult);
404405
}
405406

406407
@Override
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Set;
24+
25+
/**
26+
* A class to represent the result of registering kv snapshot. It contains:
27+
*
28+
* <ul>
29+
* <li>An set of failed tableBuckets. Such as the specify snapshotId is not exist for this table
30+
* bucket.
31+
* </ul>
32+
*
33+
* @since 0.9
34+
*/
35+
@PublicEvolving
36+
public class RegisterKvSnapshotResult {
37+
private final Set<TableBucket> failedTableBucketSet;
38+
39+
public RegisterKvSnapshotResult(Set<TableBucket> failedTableBucketSet) {
40+
this.failedTableBucketSet = failedTableBucketSet;
41+
}
42+
43+
public Set<TableBucket> getFailedTableBucketSet() {
44+
return failedTableBucketSet;
45+
}
46+
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2424
import org.apache.fluss.client.metadata.KvSnapshots;
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
26+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2627
import org.apache.fluss.client.write.KvWriteBatch;
2728
import org.apache.fluss.client.write.ReadyWriteBatch;
2829
import org.apache.fluss.config.cluster.AlterConfigOpType;
@@ -65,10 +66,12 @@
6566
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
6667
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6768
import org.apache.fluss.rpc.messages.PbRenameColumn;
69+
import org.apache.fluss.rpc.messages.PbTable;
6870
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6971
import org.apache.fluss.rpc.messages.ProduceLogRequest;
7072
import org.apache.fluss.rpc.messages.PutKvRequest;
7173
import org.apache.fluss.rpc.messages.RegisterKvSnapshotConsumerRequest;
74+
import org.apache.fluss.rpc.messages.RegisterKvSnapshotConsumerResponse;
7275
import org.apache.fluss.rpc.messages.UnregisterKvSnapshotConsumerRequest;
7376
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7477
import org.apache.fluss.utils.json.JsonSerdeUtils;
@@ -79,6 +82,7 @@
7982
import java.util.Arrays;
8083
import java.util.Collection;
8184
import java.util.HashMap;
85+
import java.util.HashSet;
8286
import java.util.List;
8387
import java.util.Map;
8488
import java.util.Set;
@@ -408,6 +412,23 @@ public static RegisterKvSnapshotConsumerRequest makeRegisterKvSnapshotConsumerRe
408412
return request;
409413
}
410414

415+
public static RegisterKvSnapshotResult toRegisterKvSnapshotResult(
416+
RegisterKvSnapshotConsumerResponse response) {
417+
Set<TableBucket> failedTableBucketSet = new HashSet<>();
418+
for (PbTable failedTable : response.getFailedTablesList()) {
419+
long tableId = failedTable.getTableId();
420+
for (PbBucket pbBucket : failedTable.getBucketsList()) {
421+
TableBucket tableBucket =
422+
new TableBucket(
423+
tableId,
424+
pbBucket.hasPartitionId() ? pbBucket.getPartitionId() : null,
425+
pbBucket.getBucketId());
426+
failedTableBucketSet.add(tableBucket);
427+
}
428+
}
429+
return new RegisterKvSnapshotResult(failedTableBucketSet);
430+
}
431+
411432
public static UnregisterKvSnapshotConsumerRequest makeUnregisterKvSnapshotConsumerRequest(
412433
String consumerId, Set<TableBucket> bucketsToUnregister) {
413434
UnregisterKvSnapshotConsumerRequest request = new UnregisterKvSnapshotConsumerRequest();

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,6 +1462,12 @@ public class ConfigOptions {
14621462
.withDescription(
14631463
"The number of threads the server uses to transfer (download and upload) kv snapshot files.");
14641464

1465+
public static final ConfigOption<Integer> KV_MAX_RETAINED_SNAPSHOTS =
1466+
key("kv.snapshot.num-retained")
1467+
.intType()
1468+
.defaultValue(1)
1469+
.withDescription("The maximum number of completed snapshots to retain.");
1470+
14651471
public static final ConfigOption<Duration> KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL =
14661472
key("kv.snapshot.consumer-expiration-check-interval")
14671473
.durationType()

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ abstract class FlinkMetricsITCase {
6868
.setClusterConf(
6969
new org.apache.fluss.config.Configuration()
7070
// set snapshot interval to 1s for testing purposes
71-
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)))
71+
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
72+
// not to clean snapshots for test purpose
73+
.set(
74+
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
75+
Integer.MAX_VALUE))
7276
.setNumOfTabletServers(3)
7377
.build();
7478

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ abstract class FlinkTableSourceFailOverITCase {
7575
.setClusterConf(
7676
new org.apache.fluss.config.Configuration()
7777
// set snapshot interval to 1s for testing purposes
78-
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)))
78+
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
79+
// not to clean snapshots for test purpose
80+
.set(
81+
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
82+
Integer.MAX_VALUE))
7983
.setNumOfTabletServers(3)
8084
.build();
8185

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase {
9494
.setClusterConf(
9595
new Configuration()
9696
// set snapshot interval to 1s for testing purposes
97-
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)))
97+
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
98+
// not to clean snapshots for test purpose
99+
.set(
100+
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
101+
Integer.MAX_VALUE))
98102
.setNumOfTabletServers(3)
99103
.setClock(CLOCK)
100104
.build();

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ private static Configuration flussClusterConfig() {
181181
Configuration conf = new Configuration();
182182
// set snapshot interval to 1s for testing purposes
183183
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
184+
// not to clean snapshots for test purpose
185+
conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
184186

185187
// enable lake tiering
186188
conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON);

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ public class FlinkTestBase extends AbstractTestBase {
7171
.setClusterConf(
7272
new Configuration()
7373
// set snapshot interval to 1s for testing purposes
74-
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)))
74+
.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
75+
// not to clean snapshots for test purpose
76+
.set(
77+
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
78+
Integer.MAX_VALUE))
7579
.setNumOfTabletServers(3)
7680
.build();
7781

0 commit comments

Comments
 (0)