Skip to content

Commit 1730d60

Browse files
committed
[kv] Support kv snapshot lease
1 parent 097d221 commit 1730d60

File tree

61 files changed

+3492
-106
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+3492
-106
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.admin;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
2122
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2223
import org.apache.fluss.client.metadata.KvSnapshots;
2324
import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -33,6 +34,7 @@
3334
import org.apache.fluss.exception.InvalidPartitionException;
3435
import org.apache.fluss.exception.InvalidReplicationFactorException;
3536
import org.apache.fluss.exception.InvalidTableException;
37+
import org.apache.fluss.exception.KvSnapshotLeaseNotExistException;
3638
import org.apache.fluss.exception.KvSnapshotNotExistException;
3739
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
3840
import org.apache.fluss.exception.NonPrimaryKeyTableException;
@@ -60,6 +62,8 @@
6062

6163
import java.util.Collection;
6264
import java.util.List;
65+
import java.util.Map;
66+
import java.util.Set;
6367
import java.util.concurrent.CompletableFuture;
6468

6569
/**
@@ -400,6 +404,75 @@ CompletableFuture<Void> dropPartition(
400404
CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
401405
TableBucket bucket, long snapshotId);
402406

407+
/**
408+
* Acquires a lease for specific KV snapshots of the given tableBuckets asynchronously.
409+
*
410+
* <p>Once acquired, the specified KV snapshots will be protected from garbage collection for
411+
* the duration of the {@code leaseDuration}. The client must call {@link #renewKvSnapshotLease}
412+
* periodically to keep the lease alive, or call {@link #releaseKvSnapshotLease} to release the
413+
* lock early when reading is finished.
414+
*
415+
* <p>If the lease expires (no renew received within duration), the server is free to delete the
416+
* snapshot files.
417+
*
418+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future:
419+
*
420+
* <ul>
421+
* <li>{@link TableNotExistException} if the table does not exist.
422+
* <li>{@link PartitionNotExistException} if the partition does not exist.
423+
* </ul>
424+
*
425+
* @param leaseId The unique ID for this lease session (usually a UUID generated by client).
426+
* @param snapshotIds The snapshots to lease, a map from TableBucket to kvSnapshotId.
427+
* @param leaseDuration The duration (in milliseconds) for which the snapshots should be kept.
428+
* @return The result of the acquire operation, containing any buckets that failed to be locked.
429+
*/
430+
CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
431+
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration);
432+
433+
/**
434+
* Renews the lease for the given leaseId asynchronously.
435+
*
436+
* <p>This acts as a heartbeat. It resets the expiration time for ALL tableBuckets currently
437+
* associated with this {@code leaseId}.
438+
*
439+
* <p>The following exceptions can be anticipated:
440+
*
441+
* <ul>
442+
* <li>{@link KvSnapshotLeaseNotExistException} if the leaseId does not exist or has already
443+
* expired.
444+
* </ul>
445+
*
446+
* @param leaseId The lease id to renew.
447+
* @param leaseDuration The new duration (in milliseconds) from now.
448+
*/
449+
CompletableFuture<Void> renewKvSnapshotLease(String leaseId, long leaseDuration);
450+
451+
/**
452+
* Releases the lease for specific tableBuckets asynchronously.
453+
*
454+
* <p>This is typically called when a client finishes reading a specific bucket (or a batch of
455+
* buckets) but is still reading others under the same leaseId.
456+
*
457+
* <p>If {@code bucketsToRelease} contains all buckets under this leaseId, the lease itself will
458+
* be removed.
459+
*
460+
* @param leaseId The lease id.
461+
* @param bucketsToRelease The specific tableBuckets to release.
462+
*/
463+
CompletableFuture<Void> releaseKvSnapshotLease(
464+
String leaseId, Set<TableBucket> bucketsToRelease);
465+
466+
/**
467+
* Drops the entire lease asynchronously.
468+
*
469+
* <p>All snapshots locked under this {@code leaseId} will be released immediately. This is
470+
* equivalent to calling {@link #releaseKvSnapshotLease} with all held buckets.
471+
*
472+
* @param leaseId The lease id to drop.
473+
*/
474+
CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
475+
403476
/**
404477
* Get table lake snapshot info of the given table asynchronously.
405478
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.client.admin;
1919

20+
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
2021
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2122
import org.apache.fluss.client.metadata.KvSnapshots;
2223
import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -54,6 +55,7 @@
5455
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
5556
import org.apache.fluss.rpc.messages.DropAclsRequest;
5657
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
58+
import org.apache.fluss.rpc.messages.DropKvSnapshotLeaseRequest;
5759
import org.apache.fluss.rpc.messages.DropTableRequest;
5860
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
5961
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
@@ -72,6 +74,7 @@
7274
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7375
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7476
import org.apache.fluss.rpc.messages.PbTablePath;
77+
import org.apache.fluss.rpc.messages.RenewKvSnapshotLeaseRequest;
7578
import org.apache.fluss.rpc.messages.TableExistsRequest;
7679
import org.apache.fluss.rpc.messages.TableExistsResponse;
7780
import org.apache.fluss.rpc.protocol.ApiError;
@@ -87,13 +90,16 @@
8790
import java.util.HashMap;
8891
import java.util.List;
8992
import java.util.Map;
93+
import java.util.Set;
9094
import java.util.concurrent.CompletableFuture;
9195

96+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAcquireKvSnapshotLeaseRequest;
9297
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
9398
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9499
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
95100
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
96101
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
102+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeReleaseKvSnapshotLeaseRequest;
97103
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
98104
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
99105
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -377,6 +383,42 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
377383
.thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
378384
}
379385

386+
@Override
387+
public CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
388+
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration) {
389+
if (snapshotIds.isEmpty()) {
390+
throw new IllegalArgumentException(
391+
"The snapshotIds to acquire kv snapshot lease is empty");
392+
}
393+
394+
return gateway.acquireKvSnapshotLease(
395+
makeAcquireKvSnapshotLeaseRequest(leaseId, snapshotIds, leaseDuration))
396+
.thenApply(ClientRpcMessageUtils::toAcquireKvSnapshotLeaseResult);
397+
}
398+
399+
@Override
400+
public CompletableFuture<Void> renewKvSnapshotLease(String leaseId, long leaseDuration) {
401+
RenewKvSnapshotLeaseRequest request =
402+
new RenewKvSnapshotLeaseRequest()
403+
.setLeaseId(leaseId)
404+
.setLeaseDuration(leaseDuration);
405+
return gateway.renewKvSnapshotLease(request).thenApply(r -> null);
406+
}
407+
408+
@Override
409+
public CompletableFuture<Void> releaseKvSnapshotLease(
410+
String leaseId, Set<TableBucket> bucketsToRelease) {
411+
return gateway.releaseKvSnapshotLease(
412+
makeReleaseKvSnapshotLeaseRequest(leaseId, bucketsToRelease))
413+
.thenApply(r -> null);
414+
}
415+
416+
@Override
417+
public CompletableFuture<Void> dropKvSnapshotLease(String leaseId) {
418+
DropKvSnapshotLeaseRequest request = new DropKvSnapshotLeaseRequest().setLeaseId(leaseId);
419+
return gateway.dropKvSnapshotLease(request).thenApply(r -> null);
420+
}
421+
380422
@Override
381423
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
382424
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Map;
24+
import java.util.Set;
25+
26+
/**
27+
* A class to represent the result of acquire kv snapshot lease. It contains:
28+
*
29+
* <ul>
30+
* <li>A map of unavailable snapshots. Such as the specify snapshotId is not exist for this table
31+
* bucket.
32+
* </ul>
33+
*
34+
* @since 0.9
35+
*/
36+
@PublicEvolving
37+
public class AcquireKvSnapshotLeaseResult {
38+
private final Map<TableBucket, Long> unavailableSnapshots;
39+
40+
public AcquireKvSnapshotLeaseResult(Map<TableBucket, Long> unavailableSnapshots) {
41+
this.unavailableSnapshots = unavailableSnapshots;
42+
}
43+
44+
/**
45+
* Returns the set of buckets that could not be locked (e.g., snapshot ID doesn't exist or has
46+
* already been GC'ed).
47+
*/
48+
public Map<TableBucket, Long> getUnavailableSnapshots() {
49+
return unavailableSnapshots;
50+
}
51+
52+
public Set<TableBucket> getUnavailableTableBucketSet() {
53+
return unavailableSnapshots.keySet();
54+
}
55+
}

fluss-client/src/main/java/org/apache/fluss/client/metadata/KvSnapshots.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.fluss.client.metadata;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
2122

2223
import javax.annotation.Nullable;
2324

2425
import java.util.Map;
2526
import java.util.OptionalLong;
2627
import java.util.Set;
28+
import java.util.stream.Collectors;
2729

2830
/**
2931
* A class representing the kv snapshots of a table or a partition. It contains multiple snapshots
@@ -71,6 +73,12 @@ public Set<Integer> getBucketIds() {
7173
return snapshotIds.keySet();
7274
}
7375

76+
public Set<TableBucket> getTableBuckets() {
77+
return snapshotIds.keySet().stream()
78+
.map(bucketId -> new TableBucket(tableId, partitionId, bucketId))
79+
.collect(Collectors.toSet());
80+
}
81+
7482
/**
7583
* Get the latest snapshot id for this kv tablet (bucket), or empty if there are no snapshots.
7684
*/

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.client.admin.OffsetSpec;
2121
import org.apache.fluss.client.lookup.LookupBatch;
2222
import org.apache.fluss.client.lookup.PrefixLookupBatch;
23+
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
2324
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2425
import org.apache.fluss.client.metadata.KvSnapshots;
2526
import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -37,6 +38,8 @@
3738
import org.apache.fluss.metadata.TableBucket;
3839
import org.apache.fluss.metadata.TableChange;
3940
import org.apache.fluss.metadata.TablePath;
41+
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
42+
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse;
4043
import org.apache.fluss.rpc.messages.AlterTableRequest;
4144
import org.apache.fluss.rpc.messages.CreatePartitionRequest;
4245
import org.apache.fluss.rpc.messages.DropPartitionRequest;
@@ -50,10 +53,13 @@
5053
import org.apache.fluss.rpc.messages.MetadataRequest;
5154
import org.apache.fluss.rpc.messages.PbAddColumn;
5255
import org.apache.fluss.rpc.messages.PbAlterConfig;
56+
import org.apache.fluss.rpc.messages.PbBucket;
5357
import org.apache.fluss.rpc.messages.PbDescribeConfig;
5458
import org.apache.fluss.rpc.messages.PbDropColumn;
5559
import org.apache.fluss.rpc.messages.PbKeyValue;
5660
import org.apache.fluss.rpc.messages.PbKvSnapshot;
61+
import org.apache.fluss.rpc.messages.PbKvSnapshotLeaseForBucket;
62+
import org.apache.fluss.rpc.messages.PbKvSnapshotLeaseForTable;
5763
import org.apache.fluss.rpc.messages.PbLakeSnapshotForBucket;
5864
import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
5965
import org.apache.fluss.rpc.messages.PbModifyColumn;
@@ -66,6 +72,7 @@
6672
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6773
import org.apache.fluss.rpc.messages.ProduceLogRequest;
6874
import org.apache.fluss.rpc.messages.PutKvRequest;
75+
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
6976
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7077
import org.apache.fluss.utils.json.JsonSerdeUtils;
7178

@@ -370,6 +377,75 @@ public static AlterTableRequest makeAlterTableRequest(
370377
return request;
371378
}
372379

380+
public static AcquireKvSnapshotLeaseRequest makeAcquireKvSnapshotLeaseRequest(
381+
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration) {
382+
AcquireKvSnapshotLeaseRequest request = new AcquireKvSnapshotLeaseRequest();
383+
request.setLeaseId(leaseId).setLeaseDuration(leaseDuration);
384+
385+
Map<Long, List<PbKvSnapshotLeaseForBucket>> pbLeaseForTables = new HashMap<>();
386+
for (Map.Entry<TableBucket, Long> entry : snapshotIds.entrySet()) {
387+
TableBucket tableBucket = entry.getKey();
388+
Long snapshotId = entry.getValue();
389+
PbKvSnapshotLeaseForBucket pbLeaseForBucket =
390+
new PbKvSnapshotLeaseForBucket()
391+
.setBucketId(tableBucket.getBucket())
392+
.setSnapshotId(snapshotId);
393+
if (tableBucket.getPartitionId() != null) {
394+
pbLeaseForBucket.setPartitionId(tableBucket.getPartitionId());
395+
}
396+
pbLeaseForTables
397+
.computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>())
398+
.add(pbLeaseForBucket);
399+
}
400+
401+
for (Map.Entry<Long, List<PbKvSnapshotLeaseForBucket>> entry :
402+
pbLeaseForTables.entrySet()) {
403+
request.addTableLeaseReq()
404+
.setTableId(entry.getKey())
405+
.addAllBucketsReqs(entry.getValue());
406+
}
407+
return request;
408+
}
409+
410+
public static AcquireKvSnapshotLeaseResult toAcquireKvSnapshotLeaseResult(
411+
AcquireKvSnapshotLeaseResponse response) {
412+
Map<TableBucket, Long> unavailableSnapshots = new HashMap<>();
413+
for (PbKvSnapshotLeaseForTable leaseForTable : response.getTablesLeaseResList()) {
414+
long tableId = leaseForTable.getTableId();
415+
for (PbKvSnapshotLeaseForBucket leaseForBucket : leaseForTable.getBucketsReqsList()) {
416+
TableBucket tableBucket =
417+
new TableBucket(
418+
tableId,
419+
leaseForBucket.hasPartitionId()
420+
? leaseForBucket.getPartitionId()
421+
: null,
422+
leaseForBucket.getBucketId());
423+
unavailableSnapshots.put(tableBucket, leaseForBucket.getSnapshotId());
424+
}
425+
}
426+
return new AcquireKvSnapshotLeaseResult(unavailableSnapshots);
427+
}
428+
429+
public static ReleaseKvSnapshotLeaseRequest makeReleaseKvSnapshotLeaseRequest(
430+
String leaseId, Set<TableBucket> bucketsToRelease) {
431+
ReleaseKvSnapshotLeaseRequest request = new ReleaseKvSnapshotLeaseRequest();
432+
request.setLeaseId(leaseId);
433+
434+
Map<Long, List<PbBucket>> pbLeasedTable = new HashMap<>();
435+
for (TableBucket tb : bucketsToRelease) {
436+
PbBucket pbBucket = new PbBucket().setBucketId(tb.getBucket());
437+
if (tb.getPartitionId() != null) {
438+
pbBucket.setPartitionId(tb.getPartitionId());
439+
}
440+
pbLeasedTable.computeIfAbsent(tb.getTableId(), k -> new ArrayList<>()).add(pbBucket);
441+
}
442+
443+
for (Map.Entry<Long, List<PbBucket>> entry : pbLeasedTable.entrySet()) {
444+
request.addReleaseTable().setTableId(entry.getKey()).addAllBuckets(entry.getValue());
445+
}
446+
return request;
447+
}
448+
373449
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
374450
return response.getPartitionsInfosList().stream()
375451
.map(

0 commit comments

Comments
 (0)