Skip to content

Commit b53fa8a

Browse files
committed
change to kv snapshot lease
1 parent 6c05d41 commit b53fa8a

File tree

47 files changed

+1143
-985
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1143
-985
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package org.apache.fluss.client.admin;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
2122
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2223
import org.apache.fluss.client.metadata.KvSnapshots;
2324
import org.apache.fluss.client.metadata.LakeSnapshot;
24-
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2525
import org.apache.fluss.cluster.ServerNode;
2626
import org.apache.fluss.config.ConfigOptions;
2727
import org.apache.fluss.config.cluster.AlterConfig;
@@ -34,7 +34,7 @@
3434
import org.apache.fluss.exception.InvalidPartitionException;
3535
import org.apache.fluss.exception.InvalidReplicationFactorException;
3636
import org.apache.fluss.exception.InvalidTableException;
37-
import org.apache.fluss.exception.KvSnapshotConsumerNotExistException;
37+
import org.apache.fluss.exception.KvSnapshotLeaseNotExistException;
3838
import org.apache.fluss.exception.KvSnapshotNotExistException;
3939
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
4040
import org.apache.fluss.exception.NonPrimaryKeyTableException;
@@ -405,60 +405,73 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
405405
TableBucket bucket, long snapshotId);
406406

407407
/**
408-
* Register the kv snapshot consumer of the given tableBucket set asynchronously. After
409-
* registered, the kv snapshot will not be deleted until unregistered or the consumer expired.
408+
* Acquires a lease for specific KV snapshots of the given tableBuckets asynchronously.
410409
*
411-
* <p>For the details bucket to consume, you can call {@link #getLatestKvSnapshots(TablePath)}
412-
* or {@link #getLatestKvSnapshots(TablePath, String)} first.
410+
* <p>Once acquired, the specified KV snapshots will be protected from garbage collection for
411+
* the duration of the {@code leaseDuration}. The client must call {@link #renewKvSnapshotLease}
412+
* periodically to keep the lease alive, or call {@link #releaseKvSnapshotLease} to release the
413+
* lock early when reading is finished.
413414
*
414-
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
415+
* <p>If the lease expires (no renew received within duration), the server is free to delete the
416+
* snapshot files.
417+
*
418+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future:
415419
*
416420
* <ul>
417421
* <li>{@link TableNotExistException} if the table does not exist.
418-
* <li>{@link PartitionNotExistException} if the partition does not exist
422+
* <li>{@link PartitionNotExistException} if the partition does not exist.
419423
* </ul>
420424
*
421-
* @param consumerId the consumer id.
422-
* @param consumeBuckets the tableBuckets to consume, a map from TableBucket to kvSnapshotId.
423-
* @return the result of registering kv snapshot consumer.
425+
* @param leaseId The unique ID for this lease session (usually a UUID generated by client).
426+
* @param snapshotIds The snapshots to consume, a map from TableBucket to kvSnapshotId.
427+
* @param leaseDuration The duration (in milliseconds) for which the snapshots should be kept.
428+
* @return The result of the acquire operation, containing any buckets that failed to be locked.
424429
*/
425-
CompletableFuture<RegisterKvSnapshotResult> registerKvSnapshotConsumer(
426-
String consumerId, Map<TableBucket, Long> consumeBuckets);
430+
CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
431+
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration);
427432

428433
/**
429-
* Unregister the kv snapshot consumer of the given tableBucket set asynchronously.
434+
* Renews the lease for the given leaseId asynchronously.
435+
*
436+
* <p>This acts as a heartbeat. It resets the expiration time for ALL tableBuckets currently
437+
* associated with this {@code leaseId}.
430438
*
431-
* <p>As the following situation happens, unregister operation will be partially ignored or
432-
* completely ignored
439+
* <p>The following exceptions can be anticipated:
433440
*
434441
* <ul>
435-
* <li>If the table/partition not exists, the unregister operation will be ignored for this
436-
* table/partition, others will success.
437-
* <li>If the partition bucket/kvSnapshotId not exists, the unregister operation will be
438-
* ignored for this bucket/kvSnapshotId, others will success.
439-
* <li>If the consumerId not exists, the unregister operation will be ignored.
442+
* <li>{@link KvSnapshotLeaseNotExistException} if the leaseId does not exist or has already
443+
* expired.
440444
* </ul>
441445
*
442-
* @param consumerId the consumer id.
443-
* @param bucketsToUnregister the tableBucket to unregister.
446+
* @param leaseId The lease id to renew.
447+
* @param leaseDuration The new duration (in milliseconds) from now.
444448
*/
445-
CompletableFuture<Void> unregisterKvSnapshotConsumer(
446-
String consumerId, Set<TableBucket> bucketsToUnregister);
449+
CompletableFuture<Void> renewKvSnapshotLease(String leaseId, long leaseDuration);
447450

448451
/**
449-
* Clear the kv snapshot consumer asynchronously. After cleared, all the registered kv snapshot
450-
* consume info for this consumer will be cleared.
452+
* Releases the lease for specific tableBuckets asynchronously.
451453
*
452-
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
454+
* <p>This is typically called when a client finishes reading a specific bucket (or a batch of
455+
* buckets) but is still reading others under the same leaseId.
453456
*
454-
* <ul>
455-
* <li>{@link TableNotExistException} if the table does not exist.
456-
* <li>{@link KvSnapshotConsumerNotExistException} if the kv snapshot consumer does not exist.
457-
* </ul>
457+
* <p>If {@code bucketsToRelease} contains all buckets under this leaseId, the lease itself will
458+
* be removed.
459+
*
460+
* @param leaseId The lease id.
461+
* @param bucketsToRelease The specific tableBuckets to release.
462+
*/
463+
CompletableFuture<Void> releaseKvSnapshotLease(
464+
String leaseId, Set<TableBucket> bucketsToRelease);
465+
466+
/**
467+
* Drops the entire lease asynchronously.
468+
*
469+
* <p>All snapshots locked under this {@code leaseId} will be released immediately. This is
470+
* equivalent to calling {@link #releaseKvSnapshotLease} with all held buckets.
458471
*
459-
* @param consumerId the consumer id.
472+
* @param leaseId The lease id to drop.
460473
*/
461-
CompletableFuture<Void> clearKvSnapshotConsumer(String consumerId);
474+
CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
462475

463476
/**
464477
* Get table lake snapshot info of the given table asynchronously.

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package org.apache.fluss.client.admin;
1919

20+
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
2021
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2122
import org.apache.fluss.client.metadata.KvSnapshots;
2223
import org.apache.fluss.client.metadata.LakeSnapshot;
2324
import org.apache.fluss.client.metadata.MetadataUpdater;
24-
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2525
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2626
import org.apache.fluss.cluster.Cluster;
2727
import org.apache.fluss.cluster.ServerNode;
28-
import org.apache.fluss.config.ConfigOptions;
2928
import org.apache.fluss.config.Configuration;
3029
import org.apache.fluss.config.cluster.AlterConfig;
3130
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -49,7 +48,6 @@
4948
import org.apache.fluss.rpc.gateway.TabletServerGateway;
5049
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
5150
import org.apache.fluss.rpc.messages.AlterTableRequest;
52-
import org.apache.fluss.rpc.messages.ClearKvSnapshotConsumerRequest;
5351
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5452
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5553
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -58,6 +56,7 @@
5856
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
5957
import org.apache.fluss.rpc.messages.DropAclsRequest;
6058
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
59+
import org.apache.fluss.rpc.messages.DropKvSnapshotLeaseRequest;
6160
import org.apache.fluss.rpc.messages.DropTableRequest;
6261
import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest;
6362
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
@@ -76,6 +75,7 @@
7675
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7776
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7877
import org.apache.fluss.rpc.messages.PbTablePath;
78+
import org.apache.fluss.rpc.messages.RenewKvSnapshotLeaseRequest;
7979
import org.apache.fluss.rpc.messages.TableExistsRequest;
8080
import org.apache.fluss.rpc.messages.TableExistsResponse;
8181
import org.apache.fluss.rpc.protocol.ApiError;
@@ -94,13 +94,13 @@
9494
import java.util.Set;
9595
import java.util.concurrent.CompletableFuture;
9696

97+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAcquireKvSnapshotLeaseRequest;
9798
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
9899
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
99100
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
100101
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
101102
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
102-
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeRegisterKvSnapshotConsumerRequest;
103-
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeUnregisterKvSnapshotConsumerRequest;
103+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeReleaseKvSnapshotLeaseRequest;
104104
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
105105
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
106106
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -388,35 +388,39 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
388388
}
389389

390390
@Override
391-
public CompletableFuture<RegisterKvSnapshotResult> registerKvSnapshotConsumer(
392-
String consumerId, Map<TableBucket, Long> consumeBuckets) {
393-
if (consumeBuckets.isEmpty()) {
394-
throw new IllegalArgumentException("consumeBuckets is empty");
391+
public CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
392+
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration) {
393+
if (snapshotIds.isEmpty()) {
394+
throw new IllegalArgumentException(
395+
"The snapshotIds to acquire kv snapshot lease is empty");
395396
}
396397

397-
long expirationTime =
398-
clientConfig
399-
.get(ConfigOptions.CLIENT_SCANNER_KV_SNAPSHOT_CONSUMER_EXPIRATION_TIME)
400-
.toMillis();
401-
return gateway.registerKvSnapshotConsumer(
402-
makeRegisterKvSnapshotConsumerRequest(
403-
consumerId, consumeBuckets, expirationTime))
404-
.thenApply(ClientRpcMessageUtils::toRegisterKvSnapshotResult);
398+
return gateway.acquireKvSnapshotLease(
399+
makeAcquireKvSnapshotLeaseRequest(leaseId, snapshotIds, leaseDuration))
400+
.thenApply(ClientRpcMessageUtils::toAcquireKvSnapshotLeaseResult);
405401
}
406402

407403
@Override
408-
public CompletableFuture<Void> unregisterKvSnapshotConsumer(
409-
String consumerId, Set<TableBucket> bucketsToUnregister) {
410-
return gateway.unregisterKvSnapshotConsumer(
411-
makeUnregisterKvSnapshotConsumerRequest(consumerId, bucketsToUnregister))
404+
public CompletableFuture<Void> renewKvSnapshotLease(String leaseId, long leaseDuration) {
405+
RenewKvSnapshotLeaseRequest request =
406+
new RenewKvSnapshotLeaseRequest()
407+
.setLeaseId(leaseId)
408+
.setLeaseDuration(leaseDuration);
409+
return gateway.renewKvSnapshotLease(request).thenApply(r -> null);
410+
}
411+
412+
@Override
413+
public CompletableFuture<Void> releaseKvSnapshotLease(
414+
String leaseId, Set<TableBucket> bucketsToRelease) {
415+
return gateway.releaseKvSnapshotLease(
416+
makeReleaseKvSnapshotLeaseRequest(leaseId, bucketsToRelease))
412417
.thenApply(r -> null);
413418
}
414419

415420
@Override
416-
public CompletableFuture<Void> clearKvSnapshotConsumer(String consumerId) {
417-
ClearKvSnapshotConsumerRequest request =
418-
new ClearKvSnapshotConsumerRequest().setConsumerId(consumerId);
419-
return gateway.clearKvSnapshotConsumer(request).thenApply(r -> null);
421+
public CompletableFuture<Void> dropKvSnapshotLease(String leaseId) {
422+
DropKvSnapshotLeaseRequest request = new DropKvSnapshotLeaseRequest().setLeaseId(leaseId);
423+
return gateway.dropKvSnapshotLease(request).thenApply(r -> null);
420424
}
421425

422426
@Override

fluss-client/src/main/java/org/apache/fluss/client/metadata/RegisterKvSnapshotResult.java renamed to fluss-client/src/main/java/org/apache/fluss/client/metadata/AcquireKvSnapshotLeaseResult.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,36 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.metadata.TableBucket;
2222

23+
import java.util.Map;
2324
import java.util.Set;
2425

2526
/**
26-
* A class to represent the result of registering kv snapshot. It contains:
27+
* A class to represent the result of acquire kv snapshot lease. It contains:
2728
*
2829
* <ul>
29-
* <li>An set of failed tableBuckets. Such as the specify snapshotId is not exist for this table
30+
* <li>A map of unavailable snapshots. Such as the specify snapshotId is not exist for this table
3031
* bucket.
3132
* </ul>
3233
*
3334
* @since 0.9
3435
*/
3536
@PublicEvolving
36-
public class RegisterKvSnapshotResult {
37-
private final Set<TableBucket> failedTableBucketSet;
37+
public class AcquireKvSnapshotLeaseResult {
38+
private final Map<TableBucket, Long> unavailableSnapshots;
3839

39-
public RegisterKvSnapshotResult(Set<TableBucket> failedTableBucketSet) {
40-
this.failedTableBucketSet = failedTableBucketSet;
40+
public AcquireKvSnapshotLeaseResult(Map<TableBucket, Long> unavailableSnapshots) {
41+
this.unavailableSnapshots = unavailableSnapshots;
4142
}
4243

43-
public Set<TableBucket> getFailedTableBucketSet() {
44-
return failedTableBucketSet;
44+
/**
45+
* Returns the set of buckets that could not be locked (e.g., snapshot ID doesn't exist or has
46+
* already been GC'ed).
47+
*/
48+
public Map<TableBucket, Long> getUnavailableSnapshots() {
49+
return unavailableSnapshots;
50+
}
51+
52+
public Set<TableBucket> getUnavailableTableBucketSet() {
53+
return unavailableSnapshots.keySet();
4554
}
4655
}

0 commit comments

Comments
 (0)