Skip to content

Commit 05bc3d6

Browse files
committed
[kv] Introduce the API and implement the KV snapshot consumer
1 parent e91106c commit 05bc3d6

File tree

48 files changed

+2761
-124
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2761
-124
lines changed

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public Admin getOrCreateAdmin() {
142142
if (admin == null) {
143143
synchronized (this) {
144144
if (admin == null) {
145-
admin = new FlussAdmin(rpcClient, metadataUpdater);
145+
admin = new FlussAdmin(rpcClient, metadataUpdater, conf);
146146
}
147147
}
148148
}

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.exception.InvalidPartitionException;
3434
import org.apache.fluss.exception.InvalidReplicationFactorException;
3535
import org.apache.fluss.exception.InvalidTableException;
36+
import org.apache.fluss.exception.KvSnapshotConsumerNotExistException;
3637
import org.apache.fluss.exception.KvSnapshotNotExistException;
3738
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
3839
import org.apache.fluss.exception.NonPrimaryKeyTableException;
@@ -60,6 +61,8 @@
6061

6162
import java.util.Collection;
6263
import java.util.List;
64+
import java.util.Map;
65+
import java.util.Set;
6366
import java.util.concurrent.CompletableFuture;
6467

6568
/**
@@ -400,6 +403,61 @@ CompletableFuture<Void> dropPartition(
400403
CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
401404
TableBucket bucket, long snapshotId);
402405

406+
/**
407+
* Register the kv snapshot consumer of the given tableBucket set asynchronously. After
408+
* registered, the kv snapshot will not be deleted until unregistered or the consumer expired.
409+
*
410+
* <p>For the details bucket to consume, you can call {@link #getLatestKvSnapshots(TablePath)}
411+
* or {@link #getLatestKvSnapshots(TablePath, String)} first.
412+
*
413+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
414+
*
415+
* <ul>
416+
* <li>{@link TableNotExistException} if the table does not exist.
417+
* <li>{@link PartitionNotExistException} if the partition does not exist
418+
* </ul>
419+
*
420+
* @param consumerId the consumer id.
421+
* @param consumeBuckets the tableBuckets to consume, a map from TableBucket to kvSnapshotId.
422+
*/
423+
CompletableFuture<Void> registerKvSnapshotConsumer(
424+
String consumerId, Map<TableBucket, Long> consumeBuckets);
425+
426+
/**
427+
* Unregister the kv snapshot consumer of the given tableBucket set asynchronously.
428+
*
429+
* <p>As the following situation happens, unregister operation will be partially ignored or
430+
* completely ignored
431+
*
432+
* <ul>
433+
* <li>If the table/partition not exists, the unregister operation will be ignored for this
434+
* table/partition, others will success.
435+
* <li>If the partition bucket/kvSnapshotId not exists, the unregister operation will be
436+
* ignored for this bucket/kvSnapshotId, others will success.
437+
* <li>If the consumerId not exists, the unregister operation will be ignored.
438+
* </ul>
439+
*
440+
* @param consumerId the consumer id.
441+
* @param bucketsToUnregister the tableBucket to unregister.
442+
*/
443+
CompletableFuture<Void> unregisterKvSnapshotConsumer(
444+
String consumerId, Set<TableBucket> bucketsToUnregister);
445+
446+
/**
447+
* Clear the kv snapshot consumer asynchronously. After cleared, all the registered kv snapshot
448+
* consume info for this consumer will be cleared.
449+
*
450+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
451+
*
452+
* <ul>
453+
* <li>{@link TableNotExistException} if the table does not exist.
454+
* <li>{@link KvSnapshotConsumerNotExistException} if the kv snapshot consumer does not exist.
455+
* </ul>
456+
*
457+
* @param consumerId the consumer id.
458+
*/
459+
CompletableFuture<Void> clearKvSnapshotConsumer(String consumerId);
460+
403461
/**
404462
* Get table lake snapshot info of the given table asynchronously.
405463
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.config.ConfigOptions;
28+
import org.apache.fluss.config.Configuration;
2729
import org.apache.fluss.config.cluster.AlterConfig;
2830
import org.apache.fluss.config.cluster.ConfigEntry;
2931
import org.apache.fluss.exception.LeaderNotAvailableException;
@@ -46,6 +48,7 @@
4648
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4749
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
4850
import org.apache.fluss.rpc.messages.AlterTableRequest;
51+
import org.apache.fluss.rpc.messages.ClearKvSnapshotConsumerRequest;
4952
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5053
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5154
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -87,13 +90,16 @@
8790
import java.util.HashMap;
8891
import java.util.List;
8992
import java.util.Map;
93+
import java.util.Set;
9094
import java.util.concurrent.CompletableFuture;
9195

9296
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
9397
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9498
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9599
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
96100
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
101+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeRegisterKvSnapshotConsumerRequest;
102+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeUnregisterKvSnapshotConsumerRequest;
97103
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
98104
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
99105
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -112,15 +118,18 @@ public class FlussAdmin implements Admin {
112118
private final AdminGateway gateway;
113119
private final AdminReadOnlyGateway readOnlyGateway;
114120
private final MetadataUpdater metadataUpdater;
121+
private final Configuration clientConfig;
115122

116-
public FlussAdmin(RpcClient client, MetadataUpdater metadataUpdater) {
123+
public FlussAdmin(
124+
RpcClient client, MetadataUpdater metadataUpdater, Configuration clientConfig) {
117125
this.gateway =
118126
GatewayClientProxy.createGatewayProxy(
119127
metadataUpdater::getCoordinatorServer, client, AdminGateway.class);
120128
this.readOnlyGateway =
121129
GatewayClientProxy.createGatewayProxy(
122130
metadataUpdater::getRandomTabletServer, client, AdminGateway.class);
123131
this.metadataUpdater = metadataUpdater;
132+
this.clientConfig = clientConfig;
124133
}
125134

126135
@Override
@@ -377,6 +386,38 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
377386
.thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
378387
}
379388

389+
@Override
390+
public CompletableFuture<Void> registerKvSnapshotConsumer(
391+
String consumerId, Map<TableBucket, Long> consumeBuckets) {
392+
if (consumeBuckets.isEmpty()) {
393+
throw new IllegalArgumentException("consumeBuckets is empty");
394+
}
395+
396+
long expirationTime =
397+
clientConfig
398+
.get(ConfigOptions.CLIENT_SCANNER_KV_SNAPSHOT_CONSUMER_EXPIRATION_TIME)
399+
.toMillis();
400+
return gateway.registerKvSnapshotConsumer(
401+
makeRegisterKvSnapshotConsumerRequest(
402+
consumerId, consumeBuckets, expirationTime))
403+
.thenApply(r -> null);
404+
}
405+
406+
@Override
407+
public CompletableFuture<Void> unregisterKvSnapshotConsumer(
408+
String consumerId, Set<TableBucket> bucketsToUnregister) {
409+
return gateway.unregisterKvSnapshotConsumer(
410+
makeUnregisterKvSnapshotConsumerRequest(consumerId, bucketsToUnregister))
411+
.thenApply(r -> null);
412+
}
413+
414+
@Override
415+
public CompletableFuture<Void> clearKvSnapshotConsumer(String consumerId) {
416+
ClearKvSnapshotConsumerRequest request =
417+
new ClearKvSnapshotConsumerRequest().setConsumerId(consumerId);
418+
return gateway.clearKvSnapshotConsumer(request).thenApply(r -> null);
419+
}
420+
380421
@Override
381422
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
382423
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import org.apache.fluss.rpc.messages.MetadataRequest;
5151
import org.apache.fluss.rpc.messages.PbAddColumn;
5252
import org.apache.fluss.rpc.messages.PbAlterConfig;
53+
import org.apache.fluss.rpc.messages.PbBucket;
54+
import org.apache.fluss.rpc.messages.PbConsumeKvSnapshotForBucket;
5355
import org.apache.fluss.rpc.messages.PbDescribeConfig;
5456
import org.apache.fluss.rpc.messages.PbDropColumn;
5557
import org.apache.fluss.rpc.messages.PbKeyValue;
@@ -66,6 +68,8 @@
6668
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6769
import org.apache.fluss.rpc.messages.ProduceLogRequest;
6870
import org.apache.fluss.rpc.messages.PutKvRequest;
71+
import org.apache.fluss.rpc.messages.RegisterKvSnapshotConsumerRequest;
72+
import org.apache.fluss.rpc.messages.UnregisterKvSnapshotConsumerRequest;
6973
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7074
import org.apache.fluss.utils.json.JsonSerdeUtils;
7175

@@ -374,6 +378,56 @@ public static AlterTableRequest makeAlterTableRequest(
374378
return request;
375379
}
376380

381+
public static RegisterKvSnapshotConsumerRequest makeRegisterKvSnapshotConsumerRequest(
382+
String consumerId, Map<TableBucket, Long> consumeBuckets, long expirationTime) {
383+
RegisterKvSnapshotConsumerRequest request = new RegisterKvSnapshotConsumerRequest();
384+
request.setConsumerId(consumerId).setExpirationTime(expirationTime);
385+
386+
Map<Long, List<PbConsumeKvSnapshotForBucket>> pbConsumeTables = new HashMap<>();
387+
for (Map.Entry<TableBucket, Long> entry : consumeBuckets.entrySet()) {
388+
TableBucket tableBucket = entry.getKey();
389+
Long snapshotId = entry.getValue();
390+
PbConsumeKvSnapshotForBucket pbConsumeKvSnapshotForBucket =
391+
new PbConsumeKvSnapshotForBucket()
392+
.setBucketId(tableBucket.getBucket())
393+
.setSnapshotId(snapshotId);
394+
if (tableBucket.getPartitionId() != null) {
395+
pbConsumeKvSnapshotForBucket.setPartitionId(tableBucket.getPartitionId());
396+
}
397+
pbConsumeTables
398+
.computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>())
399+
.add(pbConsumeKvSnapshotForBucket);
400+
}
401+
402+
for (Map.Entry<Long, List<PbConsumeKvSnapshotForBucket>> entry :
403+
pbConsumeTables.entrySet()) {
404+
request.addRegisterTable()
405+
.setTableId(entry.getKey())
406+
.addAllBucketsReqs(entry.getValue());
407+
}
408+
return request;
409+
}
410+
411+
public static UnregisterKvSnapshotConsumerRequest makeUnregisterKvSnapshotConsumerRequest(
412+
String consumerId, Set<TableBucket> bucketsToUnregister) {
413+
UnregisterKvSnapshotConsumerRequest request = new UnregisterKvSnapshotConsumerRequest();
414+
request.setConsumerId(consumerId);
415+
416+
Map<Long, List<PbBucket>> pbConsumeTables = new HashMap<>();
417+
for (TableBucket tb : bucketsToUnregister) {
418+
PbBucket pbBucket = new PbBucket().setBucketId(tb.getBucket());
419+
if (tb.getPartitionId() != null) {
420+
pbBucket.setPartitionId(tb.getPartitionId());
421+
}
422+
pbConsumeTables.computeIfAbsent(tb.getTableId(), k -> new ArrayList<>()).add(pbBucket);
423+
}
424+
425+
for (Map.Entry<Long, List<PbBucket>> entry : pbConsumeTables.entrySet()) {
426+
request.addUnregisterTable().setTableId(entry.getKey()).addAllBuckets(entry.getValue());
427+
}
428+
return request;
429+
}
430+
377431
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
378432
return response.getPartitionsInfosList().stream()
379433
.map(

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.fluss.row.ProjectedRow;
3535
import org.apache.fluss.row.encode.CompactedKeyEncoder;
3636
import org.apache.fluss.row.encode.KeyEncoder;
37+
import org.apache.fluss.server.zk.ZooKeeperClient;
38+
import org.apache.fluss.server.zk.data.KvSnapshotConsumer;
3739
import org.apache.fluss.types.DataTypes;
3840

3941
import org.junit.jupiter.api.AfterEach;
@@ -182,6 +184,80 @@ void testScanSnapshotDuringSchemaChange() throws Exception {
182184
testSnapshotRead(tablePath, expectedRowByBuckets);
183185
}
184186

187+
@Test
188+
public void testKvSnapshotConsumer() throws Exception {
189+
TablePath tablePath = TablePath.of(DEFAULT_DB, "test-kv-snapshot-consumer");
190+
long tableId = createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true);
191+
192+
String kvSnapshotConsumer1 = "test-consumer";
193+
String kvSnapshotConsumer2 = "test-consumer2";
194+
195+
// scan the snapshot
196+
Map<TableBucket, List<InternalRow>> expectedRowByBuckets = putRows(tableId, tablePath, 10);
197+
198+
// wait snapshot finish
199+
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
200+
201+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
202+
assertThat(zkClient.getKvSnapshotConsumerList()).isEmpty();
203+
204+
// test register kv snapshot consumer for snapshot 0.
205+
Map<TableBucket, Long> consumeBuckets = new HashMap<>();
206+
KvSnapshots kvSnapshots = admin.getLatestKvSnapshots(tablePath).get();
207+
for (int bucketId : kvSnapshots.getBucketIds()) {
208+
TableBucket tableBucket = new TableBucket(kvSnapshots.getTableId(), bucketId);
209+
consumeBuckets.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong());
210+
}
211+
admin.registerKvSnapshotConsumer(kvSnapshotConsumer1, consumeBuckets).get();
212+
checkKvSnapshotConsumerEquals(
213+
zkClient, kvSnapshotConsumer1, 3, tableId, new Long[] {0L, 0L, 0L});
214+
215+
expectedRowByBuckets = putRows(tableId, tablePath, 10);
216+
// wait snapshot2 finish
217+
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
218+
219+
// test register kv snapshot consumer for snapshot 1.
220+
consumeBuckets = new HashMap<>();
221+
kvSnapshots = admin.getLatestKvSnapshots(tablePath).get();
222+
for (int bucketId : kvSnapshots.getBucketIds()) {
223+
TableBucket tableBucket = new TableBucket(kvSnapshots.getTableId(), bucketId);
224+
consumeBuckets.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong());
225+
}
226+
admin.registerKvSnapshotConsumer(kvSnapshotConsumer2, consumeBuckets).get();
227+
checkKvSnapshotConsumerEquals(
228+
zkClient, kvSnapshotConsumer2, 3, tableId, new Long[] {1L, 1L, 1L});
229+
// check even snapshot1 is generated, snapshot0 also retained as consumer exists.
230+
for (TableBucket tb : expectedRowByBuckets.keySet()) {
231+
assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isTrue();
232+
assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isTrue();
233+
}
234+
235+
// unregister partial snapshot for consumer1.
236+
admin.unregisterKvSnapshotConsumer(
237+
kvSnapshotConsumer1, Collections.singleton(new TableBucket(tableId, 0)))
238+
.get();
239+
checkKvSnapshotConsumerEquals(
240+
zkClient, kvSnapshotConsumer1, 2, tableId, new Long[] {-1L, 0L, 0L});
241+
242+
// unregister all snapshot for consumer2.
243+
admin.unregisterKvSnapshotConsumer(kvSnapshotConsumer2, consumeBuckets.keySet()).get();
244+
assertThat(zkClient.getKvSnapshotConsumerList()).doesNotContain(kvSnapshotConsumer2);
245+
246+
// clear consumer1
247+
admin.clearKvSnapshotConsumer(kvSnapshotConsumer1).get();
248+
assertThat(zkClient.getKvSnapshotConsumerList()).isEmpty();
249+
250+
expectedRowByBuckets = putRows(tableId, tablePath, 10);
251+
// wait snapshot2 finish
252+
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 2);
253+
// as all consumers are cleared, and new snapshot is generated, all old snapshot are
254+
// cleared.
255+
for (TableBucket tb : expectedRowByBuckets.keySet()) {
256+
assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isFalse();
257+
assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isFalse();
258+
}
259+
}
260+
185261
private Map<TableBucket, List<InternalRow>> putRows(
186262
long tableId, TablePath tablePath, int rowNumber) throws Exception {
187263
List<InternalRow> rows = new ArrayList<>();
@@ -246,4 +322,24 @@ private void waitUntilAllSnapshotFinished(Set<TableBucket> tableBuckets, long sn
246322
FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, snapshotId);
247323
}
248324
}
325+
326+
private void checkKvSnapshotConsumerEquals(
327+
ZooKeeperClient zkClient,
328+
String consumerId,
329+
int expectedSize,
330+
long tableId,
331+
Long[] expectedBucketIndex)
332+
throws Exception {
333+
assertThat(zkClient.getKvSnapshotConsumerList()).contains(consumerId);
334+
assertThat(zkClient.getKvSnapshotConsumer(consumerId)).isPresent();
335+
KvSnapshotConsumer actual = zkClient.getKvSnapshotConsumer(consumerId).get();
336+
assertThat(actual.getTableIdToPartitions()).isEmpty();
337+
assertThat(actual.getPartitionIdToSnapshots()).isEmpty();
338+
assertThat(actual.getConsumedSnapshotCount()).isEqualTo(expectedSize);
339+
Long[] bucketIndex = actual.getTableIdToSnapshots().get(tableId);
340+
assertThat(bucketIndex).hasSize(DEFAULT_BUCKET_NUM);
341+
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
342+
assertThat(bucketIndex[i]).isEqualTo(expectedBucketIndex[i]);
343+
}
344+
}
249345
}

0 commit comments

Comments
 (0)