Skip to content

Commit 2a9a846

Browse files
committed
[kv] Support kv snapshot consumer
1 parent 5049663 commit 2a9a846

File tree

62 files changed

+3343
-109
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+3343
-109
lines changed

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public Admin getOrCreateAdmin() {
142142
if (admin == null) {
143143
synchronized (this) {
144144
if (admin == null) {
145-
admin = new FlussAdmin(rpcClient, metadataUpdater);
145+
admin = new FlussAdmin(rpcClient, metadataUpdater, conf);
146146
}
147147
}
148148
}

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
24+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2425
import org.apache.fluss.cluster.ServerNode;
2526
import org.apache.fluss.config.ConfigOptions;
2627
import org.apache.fluss.config.cluster.AlterConfig;
@@ -33,6 +34,7 @@
3334
import org.apache.fluss.exception.InvalidPartitionException;
3435
import org.apache.fluss.exception.InvalidReplicationFactorException;
3536
import org.apache.fluss.exception.InvalidTableException;
37+
import org.apache.fluss.exception.KvSnapshotConsumerNotExistException;
3638
import org.apache.fluss.exception.KvSnapshotNotExistException;
3739
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
3840
import org.apache.fluss.exception.NonPrimaryKeyTableException;
@@ -60,6 +62,8 @@
6062

6163
import java.util.Collection;
6264
import java.util.List;
65+
import java.util.Map;
66+
import java.util.Set;
6367
import java.util.concurrent.CompletableFuture;
6468

6569
/**
@@ -400,6 +404,62 @@ CompletableFuture<Void> dropPartition(
400404
CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
401405
TableBucket bucket, long snapshotId);
402406

407+
/**
408+
* Register the kv snapshot consumer of the given tableBucket set asynchronously. After
409+
* registered, the kv snapshot will not be deleted until unregistered or the consumer expired.
410+
*
411+
* <p>For the details bucket to consume, you can call {@link #getLatestKvSnapshots(TablePath)}
412+
* or {@link #getLatestKvSnapshots(TablePath, String)} first.
413+
*
414+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
415+
*
416+
* <ul>
417+
* <li>{@link TableNotExistException} if the table does not exist.
418+
* <li>{@link PartitionNotExistException} if the partition does not exist
419+
* </ul>
420+
*
421+
* @param consumerId the consumer id.
422+
* @param consumeBuckets the tableBuckets to consume, a map from TableBucket to kvSnapshotId.
423+
* @return the result of registering kv snapshot consumer.
424+
*/
425+
CompletableFuture<RegisterKvSnapshotResult> registerKvSnapshotConsumer(
426+
String consumerId, Map<TableBucket, Long> consumeBuckets);
427+
428+
/**
429+
* Unregister the kv snapshot consumer of the given tableBucket set asynchronously.
430+
*
431+
* <p>As the following situation happens, unregister operation will be partially ignored or
432+
* completely ignored
433+
*
434+
* <ul>
435+
* <li>If the table/partition not exists, the unregister operation will be ignored for this
436+
* table/partition, others will success.
437+
* <li>If the partition bucket/kvSnapshotId not exists, the unregister operation will be
438+
* ignored for this bucket/kvSnapshotId, others will success.
439+
* <li>If the consumerId not exists, the unregister operation will be ignored.
440+
* </ul>
441+
*
442+
* @param consumerId the consumer id.
443+
* @param bucketsToUnregister the tableBucket to unregister.
444+
*/
445+
CompletableFuture<Void> unregisterKvSnapshotConsumer(
446+
String consumerId, Set<TableBucket> bucketsToUnregister);
447+
448+
/**
449+
* Clear the kv snapshot consumer asynchronously. After cleared, all the registered kv snapshot
450+
* consume info for this consumer will be cleared.
451+
*
452+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
453+
*
454+
* <ul>
455+
* <li>{@link TableNotExistException} if the table does not exist.
456+
* <li>{@link KvSnapshotConsumerNotExistException} if the kv snapshot consumer does not exist.
457+
* </ul>
458+
*
459+
* @param consumerId the consumer id.
460+
*/
461+
CompletableFuture<Void> clearKvSnapshotConsumer(String consumerId);
462+
403463
/**
404464
* Get table lake snapshot info of the given table asynchronously.
405465
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
import org.apache.fluss.client.metadata.KvSnapshots;
2222
import org.apache.fluss.client.metadata.LakeSnapshot;
2323
import org.apache.fluss.client.metadata.MetadataUpdater;
24+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2425
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2526
import org.apache.fluss.cluster.Cluster;
2627
import org.apache.fluss.cluster.ServerNode;
28+
import org.apache.fluss.config.ConfigOptions;
29+
import org.apache.fluss.config.Configuration;
2730
import org.apache.fluss.config.cluster.AlterConfig;
2831
import org.apache.fluss.config.cluster.ConfigEntry;
2932
import org.apache.fluss.exception.LeaderNotAvailableException;
@@ -46,6 +49,7 @@
4649
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4750
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
4851
import org.apache.fluss.rpc.messages.AlterTableRequest;
52+
import org.apache.fluss.rpc.messages.ClearKvSnapshotConsumerRequest;
4953
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5054
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5155
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -87,13 +91,16 @@
8791
import java.util.HashMap;
8892
import java.util.List;
8993
import java.util.Map;
94+
import java.util.Set;
9095
import java.util.concurrent.CompletableFuture;
9196

9297
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
9398
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9499
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
95100
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
96101
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
102+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeRegisterKvSnapshotConsumerRequest;
103+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeUnregisterKvSnapshotConsumerRequest;
97104
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
98105
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
99106
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -112,15 +119,18 @@ public class FlussAdmin implements Admin {
112119
private final AdminGateway gateway;
113120
private final AdminReadOnlyGateway readOnlyGateway;
114121
private final MetadataUpdater metadataUpdater;
122+
private final Configuration clientConfig;
115123

116-
public FlussAdmin(RpcClient client, MetadataUpdater metadataUpdater) {
124+
public FlussAdmin(
125+
RpcClient client, MetadataUpdater metadataUpdater, Configuration clientConfig) {
117126
this.gateway =
118127
GatewayClientProxy.createGatewayProxy(
119128
metadataUpdater::getCoordinatorServer, client, AdminGateway.class);
120129
this.readOnlyGateway =
121130
GatewayClientProxy.createGatewayProxy(
122131
metadataUpdater::getRandomTabletServer, client, AdminGateway.class);
123132
this.metadataUpdater = metadataUpdater;
133+
this.clientConfig = clientConfig;
124134
}
125135

126136
@Override
@@ -377,6 +387,38 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
377387
.thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
378388
}
379389

390+
@Override
391+
public CompletableFuture<RegisterKvSnapshotResult> registerKvSnapshotConsumer(
392+
String consumerId, Map<TableBucket, Long> consumeBuckets) {
393+
if (consumeBuckets.isEmpty()) {
394+
throw new IllegalArgumentException("consumeBuckets is empty");
395+
}
396+
397+
long expirationTime =
398+
clientConfig
399+
.get(ConfigOptions.CLIENT_SCANNER_KV_SNAPSHOT_CONSUMER_EXPIRATION_TIME)
400+
.toMillis();
401+
return gateway.registerKvSnapshotConsumer(
402+
makeRegisterKvSnapshotConsumerRequest(
403+
consumerId, consumeBuckets, expirationTime))
404+
.thenApply(ClientRpcMessageUtils::toRegisterKvSnapshotResult);
405+
}
406+
407+
@Override
408+
public CompletableFuture<Void> unregisterKvSnapshotConsumer(
409+
String consumerId, Set<TableBucket> bucketsToUnregister) {
410+
return gateway.unregisterKvSnapshotConsumer(
411+
makeUnregisterKvSnapshotConsumerRequest(consumerId, bucketsToUnregister))
412+
.thenApply(r -> null);
413+
}
414+
415+
@Override
416+
public CompletableFuture<Void> clearKvSnapshotConsumer(String consumerId) {
417+
ClearKvSnapshotConsumerRequest request =
418+
new ClearKvSnapshotConsumerRequest().setConsumerId(consumerId);
419+
return gateway.clearKvSnapshotConsumer(request).thenApply(r -> null);
420+
}
421+
380422
@Override
381423
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
382424
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();

fluss-client/src/main/java/org/apache/fluss/client/metadata/KvSnapshots.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.fluss.client.metadata;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
2122

2223
import javax.annotation.Nullable;
2324

2425
import java.util.Map;
2526
import java.util.OptionalLong;
2627
import java.util.Set;
28+
import java.util.stream.Collectors;
2729

2830
/**
2931
* A class representing the kv snapshots of a table or a partition. It contains multiple snapshots
@@ -71,6 +73,12 @@ public Set<Integer> getBucketIds() {
7173
return snapshotIds.keySet();
7274
}
7375

76+
public Set<TableBucket> getTableBuckets() {
77+
return snapshotIds.keySet().stream()
78+
.map(bucketId -> new TableBucket(tableId, partitionId, bucketId))
79+
.collect(Collectors.toSet());
80+
}
81+
7482
/**
7583
* Get the latest snapshot id for this kv tablet (bucket), or empty if there are no snapshots.
7684
*/
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Set;
24+
25+
/**
26+
* A class to represent the result of registering kv snapshot. It contains:
27+
*
28+
* <ul>
29+
* <li>An set of failed tableBuckets. Such as the specify snapshotId is not exist for this table
30+
* bucket.
31+
* </ul>
32+
*
33+
* @since 0.9
34+
*/
35+
@PublicEvolving
36+
public class RegisterKvSnapshotResult {
37+
private final Set<TableBucket> failedTableBucketSet;
38+
39+
public RegisterKvSnapshotResult(Set<TableBucket> failedTableBucketSet) {
40+
this.failedTableBucketSet = failedTableBucketSet;
41+
}
42+
43+
public Set<TableBucket> getFailedTableBucketSet() {
44+
return failedTableBucketSet;
45+
}
46+
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2424
import org.apache.fluss.client.metadata.KvSnapshots;
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
26+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2627
import org.apache.fluss.client.write.KvWriteBatch;
2728
import org.apache.fluss.client.write.ReadyWriteBatch;
2829
import org.apache.fluss.config.cluster.AlterConfigOpType;
@@ -50,6 +51,8 @@
5051
import org.apache.fluss.rpc.messages.MetadataRequest;
5152
import org.apache.fluss.rpc.messages.PbAddColumn;
5253
import org.apache.fluss.rpc.messages.PbAlterConfig;
54+
import org.apache.fluss.rpc.messages.PbBucket;
55+
import org.apache.fluss.rpc.messages.PbConsumeKvSnapshotForBucket;
5356
import org.apache.fluss.rpc.messages.PbDescribeConfig;
5457
import org.apache.fluss.rpc.messages.PbDropColumn;
5558
import org.apache.fluss.rpc.messages.PbKeyValue;
@@ -63,9 +66,13 @@
6366
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
6467
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6568
import org.apache.fluss.rpc.messages.PbRenameColumn;
69+
import org.apache.fluss.rpc.messages.PbTable;
6670
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6771
import org.apache.fluss.rpc.messages.ProduceLogRequest;
6872
import org.apache.fluss.rpc.messages.PutKvRequest;
73+
import org.apache.fluss.rpc.messages.RegisterKvSnapshotConsumerRequest;
74+
import org.apache.fluss.rpc.messages.RegisterKvSnapshotConsumerResponse;
75+
import org.apache.fluss.rpc.messages.UnregisterKvSnapshotConsumerRequest;
6976
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7077
import org.apache.fluss.utils.json.JsonSerdeUtils;
7178

@@ -75,6 +82,7 @@
7582
import java.util.Arrays;
7683
import java.util.Collection;
7784
import java.util.HashMap;
85+
import java.util.HashSet;
7886
import java.util.List;
7987
import java.util.Map;
8088
import java.util.Set;
@@ -370,6 +378,73 @@ public static AlterTableRequest makeAlterTableRequest(
370378
return request;
371379
}
372380

381+
public static RegisterKvSnapshotConsumerRequest makeRegisterKvSnapshotConsumerRequest(
382+
String consumerId, Map<TableBucket, Long> consumeBuckets, long expirationTime) {
383+
RegisterKvSnapshotConsumerRequest request = new RegisterKvSnapshotConsumerRequest();
384+
request.setConsumerId(consumerId).setExpirationTime(expirationTime);
385+
386+
Map<Long, List<PbConsumeKvSnapshotForBucket>> pbConsumeTables = new HashMap<>();
387+
for (Map.Entry<TableBucket, Long> entry : consumeBuckets.entrySet()) {
388+
TableBucket tableBucket = entry.getKey();
389+
Long snapshotId = entry.getValue();
390+
PbConsumeKvSnapshotForBucket pbConsumeKvSnapshotForBucket =
391+
new PbConsumeKvSnapshotForBucket()
392+
.setBucketId(tableBucket.getBucket())
393+
.setSnapshotId(snapshotId);
394+
if (tableBucket.getPartitionId() != null) {
395+
pbConsumeKvSnapshotForBucket.setPartitionId(tableBucket.getPartitionId());
396+
}
397+
pbConsumeTables
398+
.computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>())
399+
.add(pbConsumeKvSnapshotForBucket);
400+
}
401+
402+
for (Map.Entry<Long, List<PbConsumeKvSnapshotForBucket>> entry :
403+
pbConsumeTables.entrySet()) {
404+
request.addRegisterTable()
405+
.setTableId(entry.getKey())
406+
.addAllBucketsReqs(entry.getValue());
407+
}
408+
return request;
409+
}
410+
411+
public static RegisterKvSnapshotResult toRegisterKvSnapshotResult(
412+
RegisterKvSnapshotConsumerResponse response) {
413+
Set<TableBucket> failedTableBucketSet = new HashSet<>();
414+
for (PbTable failedTable : response.getFailedTablesList()) {
415+
long tableId = failedTable.getTableId();
416+
for (PbBucket pbBucket : failedTable.getBucketsList()) {
417+
TableBucket tableBucket =
418+
new TableBucket(
419+
tableId,
420+
pbBucket.hasPartitionId() ? pbBucket.getPartitionId() : null,
421+
pbBucket.getBucketId());
422+
failedTableBucketSet.add(tableBucket);
423+
}
424+
}
425+
return new RegisterKvSnapshotResult(failedTableBucketSet);
426+
}
427+
428+
public static UnregisterKvSnapshotConsumerRequest makeUnregisterKvSnapshotConsumerRequest(
429+
String consumerId, Set<TableBucket> bucketsToUnregister) {
430+
UnregisterKvSnapshotConsumerRequest request = new UnregisterKvSnapshotConsumerRequest();
431+
request.setConsumerId(consumerId);
432+
433+
Map<Long, List<PbBucket>> pbConsumeTables = new HashMap<>();
434+
for (TableBucket tb : bucketsToUnregister) {
435+
PbBucket pbBucket = new PbBucket().setBucketId(tb.getBucket());
436+
if (tb.getPartitionId() != null) {
437+
pbBucket.setPartitionId(tb.getPartitionId());
438+
}
439+
pbConsumeTables.computeIfAbsent(tb.getTableId(), k -> new ArrayList<>()).add(pbBucket);
440+
}
441+
442+
for (Map.Entry<Long, List<PbBucket>> entry : pbConsumeTables.entrySet()) {
443+
request.addUnregisterTable().setTableId(entry.getKey()).addAllBuckets(entry.getValue());
444+
}
445+
return request;
446+
}
447+
373448
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
374449
return response.getPartitionsInfosList().stream()
375450
.map(

0 commit comments

Comments
 (0)