Skip to content

Commit 8d7f948

Browse files
committed
[kv] Support kv snapshot consumer
1 parent e91106c commit 8d7f948

File tree

65 files changed

+3305
-195
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+3305
-195
lines changed

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public Admin getOrCreateAdmin() {
142142
if (admin == null) {
143143
synchronized (this) {
144144
if (admin == null) {
145-
admin = new FlussAdmin(rpcClient, metadataUpdater);
145+
admin = new FlussAdmin(rpcClient, metadataUpdater, conf);
146146
}
147147
}
148148
}

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.exception.InvalidPartitionException;
3434
import org.apache.fluss.exception.InvalidReplicationFactorException;
3535
import org.apache.fluss.exception.InvalidTableException;
36+
import org.apache.fluss.exception.KvSnapshotConsumerNotExistException;
3637
import org.apache.fluss.exception.KvSnapshotNotExistException;
3738
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
3839
import org.apache.fluss.exception.NonPrimaryKeyTableException;
@@ -60,6 +61,8 @@
6061

6162
import java.util.Collection;
6263
import java.util.List;
64+
import java.util.Map;
65+
import java.util.Set;
6366
import java.util.concurrent.CompletableFuture;
6467

6568
/**
@@ -400,6 +403,61 @@ CompletableFuture<Void> dropPartition(
400403
CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
401404
TableBucket bucket, long snapshotId);
402405

406+
/**
407+
* Register the kv snapshot consumer of the given tableBucket set asynchronously. After
408+
* registered, the kv snapshot will not be deleted until unregistered or the consumer expired.
409+
*
410+
* <p>For the details bucket to consume, you can call {@link #getLatestKvSnapshots(TablePath)}
411+
* or {@link #getLatestKvSnapshots(TablePath, String)} first.
412+
*
413+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
414+
*
415+
* <ul>
416+
* <li>{@link TableNotExistException} if the table does not exist.
417+
* <li>{@link PartitionNotExistException} if the partition does not exist
418+
* </ul>
419+
*
420+
* @param consumerId the consumer id.
421+
* @param consumeBuckets the tableBuckets to consume, a map from TableBucket to kvSnapshotId.
422+
*/
423+
CompletableFuture<Void> registerKvSnapshotConsumer(
424+
String consumerId, Map<TableBucket, Long> consumeBuckets);
425+
426+
/**
427+
* Unregister the kv snapshot consumer of the given tableBucket set asynchronously.
428+
*
429+
* <p>As the following situation happens, unregister operation will be partially ignored or
430+
* completely ignored
431+
*
432+
* <ul>
433+
* <li>If the table/partition not exists, the unregister operation will be ignored for this
434+
* table/partition, others will success.
435+
* <li>If the partition bucket/kvSnapshotId not exists, the unregister operation will be
436+
* ignored for this bucket/kvSnapshotId, others will success.
437+
* <li>If the consumerId not exists, the unregister operation will be ignored.
438+
* </ul>
439+
*
440+
* @param consumerId the consumer id.
441+
* @param bucketsToUnregister the tableBucket to unregister.
442+
*/
443+
CompletableFuture<Void> unregisterKvSnapshotConsumer(
444+
String consumerId, Set<TableBucket> bucketsToUnregister);
445+
446+
/**
447+
* Clear the kv snapshot consumer asynchronously. After cleared, all the registered kv snapshot
448+
* consume info for this consumer will be cleared.
449+
*
450+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
451+
*
452+
* <ul>
453+
* <li>{@link TableNotExistException} if the table does not exist.
454+
* <li>{@link KvSnapshotConsumerNotExistException} if the kv snapshot consumer does not exist.
455+
* </ul>
456+
*
457+
* @param consumerId the consumer id.
458+
*/
459+
CompletableFuture<Void> clearKvSnapshotConsumer(String consumerId);
460+
403461
/**
404462
* Get table lake snapshot info of the given table asynchronously.
405463
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.config.ConfigOptions;
28+
import org.apache.fluss.config.Configuration;
2729
import org.apache.fluss.config.cluster.AlterConfig;
2830
import org.apache.fluss.config.cluster.ConfigEntry;
2931
import org.apache.fluss.exception.LeaderNotAvailableException;
@@ -46,6 +48,7 @@
4648
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4749
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
4850
import org.apache.fluss.rpc.messages.AlterTableRequest;
51+
import org.apache.fluss.rpc.messages.ClearKvSnapshotConsumerRequest;
4952
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5053
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5154
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -87,13 +90,16 @@
8790
import java.util.HashMap;
8891
import java.util.List;
8992
import java.util.Map;
93+
import java.util.Set;
9094
import java.util.concurrent.CompletableFuture;
9195

9296
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
9397
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9498
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9599
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
96100
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
101+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeRegisterKvSnapshotConsumerRequest;
102+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeUnregisterKvSnapshotConsumerRequest;
97103
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
98104
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
99105
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -112,15 +118,18 @@ public class FlussAdmin implements Admin {
112118
private final AdminGateway gateway;
113119
private final AdminReadOnlyGateway readOnlyGateway;
114120
private final MetadataUpdater metadataUpdater;
121+
private final Configuration clientConfig;
115122

116-
public FlussAdmin(RpcClient client, MetadataUpdater metadataUpdater) {
123+
public FlussAdmin(
124+
RpcClient client, MetadataUpdater metadataUpdater, Configuration clientConfig) {
117125
this.gateway =
118126
GatewayClientProxy.createGatewayProxy(
119127
metadataUpdater::getCoordinatorServer, client, AdminGateway.class);
120128
this.readOnlyGateway =
121129
GatewayClientProxy.createGatewayProxy(
122130
metadataUpdater::getRandomTabletServer, client, AdminGateway.class);
123131
this.metadataUpdater = metadataUpdater;
132+
this.clientConfig = clientConfig;
124133
}
125134

126135
@Override
@@ -377,6 +386,38 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
377386
.thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
378387
}
379388

389+
@Override
390+
public CompletableFuture<Void> registerKvSnapshotConsumer(
391+
String consumerId, Map<TableBucket, Long> consumeBuckets) {
392+
if (consumeBuckets.isEmpty()) {
393+
throw new IllegalArgumentException("consumeBuckets is empty");
394+
}
395+
396+
long expirationTime =
397+
clientConfig
398+
.get(ConfigOptions.CLIENT_SCANNER_KV_SNAPSHOT_CONSUMER_EXPIRATION_TIME)
399+
.toMillis();
400+
return gateway.registerKvSnapshotConsumer(
401+
makeRegisterKvSnapshotConsumerRequest(
402+
consumerId, consumeBuckets, expirationTime))
403+
.thenApply(r -> null);
404+
}
405+
406+
@Override
407+
public CompletableFuture<Void> unregisterKvSnapshotConsumer(
408+
String consumerId, Set<TableBucket> bucketsToUnregister) {
409+
return gateway.unregisterKvSnapshotConsumer(
410+
makeUnregisterKvSnapshotConsumerRequest(consumerId, bucketsToUnregister))
411+
.thenApply(r -> null);
412+
}
413+
414+
@Override
415+
public CompletableFuture<Void> clearKvSnapshotConsumer(String consumerId) {
416+
ClearKvSnapshotConsumerRequest request =
417+
new ClearKvSnapshotConsumerRequest().setConsumerId(consumerId);
418+
return gateway.clearKvSnapshotConsumer(request).thenApply(r -> null);
419+
}
420+
380421
@Override
381422
public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath) {
382423
GetLatestLakeSnapshotRequest request = new GetLatestLakeSnapshotRequest();

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import org.apache.fluss.rpc.messages.MetadataRequest;
5151
import org.apache.fluss.rpc.messages.PbAddColumn;
5252
import org.apache.fluss.rpc.messages.PbAlterConfig;
53+
import org.apache.fluss.rpc.messages.PbBucket;
54+
import org.apache.fluss.rpc.messages.PbConsumeKvSnapshotForBucket;
5355
import org.apache.fluss.rpc.messages.PbDescribeConfig;
5456
import org.apache.fluss.rpc.messages.PbDropColumn;
5557
import org.apache.fluss.rpc.messages.PbKeyValue;
@@ -66,6 +68,8 @@
6668
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6769
import org.apache.fluss.rpc.messages.ProduceLogRequest;
6870
import org.apache.fluss.rpc.messages.PutKvRequest;
71+
import org.apache.fluss.rpc.messages.RegisterKvSnapshotConsumerRequest;
72+
import org.apache.fluss.rpc.messages.UnregisterKvSnapshotConsumerRequest;
6973
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7074
import org.apache.fluss.utils.json.JsonSerdeUtils;
7175

@@ -374,6 +378,56 @@ public static AlterTableRequest makeAlterTableRequest(
374378
return request;
375379
}
376380

381+
public static RegisterKvSnapshotConsumerRequest makeRegisterKvSnapshotConsumerRequest(
382+
String consumerId, Map<TableBucket, Long> consumeBuckets, long expirationTime) {
383+
RegisterKvSnapshotConsumerRequest request = new RegisterKvSnapshotConsumerRequest();
384+
request.setConsumerId(consumerId).setExpirationTime(expirationTime);
385+
386+
Map<Long, List<PbConsumeKvSnapshotForBucket>> pbConsumeTables = new HashMap<>();
387+
for (Map.Entry<TableBucket, Long> entry : consumeBuckets.entrySet()) {
388+
TableBucket tableBucket = entry.getKey();
389+
Long snapshotId = entry.getValue();
390+
PbConsumeKvSnapshotForBucket pbConsumeKvSnapshotForBucket =
391+
new PbConsumeKvSnapshotForBucket()
392+
.setBucketId(tableBucket.getBucket())
393+
.setSnapshotId(snapshotId);
394+
if (tableBucket.getPartitionId() != null) {
395+
pbConsumeKvSnapshotForBucket.setPartitionId(tableBucket.getPartitionId());
396+
}
397+
pbConsumeTables
398+
.computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>())
399+
.add(pbConsumeKvSnapshotForBucket);
400+
}
401+
402+
for (Map.Entry<Long, List<PbConsumeKvSnapshotForBucket>> entry :
403+
pbConsumeTables.entrySet()) {
404+
request.addRegisterTable()
405+
.setTableId(entry.getKey())
406+
.addAllBucketsReqs(entry.getValue());
407+
}
408+
return request;
409+
}
410+
411+
public static UnregisterKvSnapshotConsumerRequest makeUnregisterKvSnapshotConsumerRequest(
412+
String consumerId, Set<TableBucket> bucketsToUnregister) {
413+
UnregisterKvSnapshotConsumerRequest request = new UnregisterKvSnapshotConsumerRequest();
414+
request.setConsumerId(consumerId);
415+
416+
Map<Long, List<PbBucket>> pbConsumeTables = new HashMap<>();
417+
for (TableBucket tb : bucketsToUnregister) {
418+
PbBucket pbBucket = new PbBucket().setBucketId(tb.getBucket());
419+
if (tb.getPartitionId() != null) {
420+
pbBucket.setPartitionId(tb.getPartitionId());
421+
}
422+
pbConsumeTables.computeIfAbsent(tb.getTableId(), k -> new ArrayList<>()).add(pbBucket);
423+
}
424+
425+
for (Map.Entry<Long, List<PbBucket>> entry : pbConsumeTables.entrySet()) {
426+
request.addUnregisterTable().setTableId(entry.getKey()).addAllBuckets(entry.getValue());
427+
}
428+
return request;
429+
}
430+
377431
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
378432
return response.getPartitionsInfosList().stream()
379433
.map(

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
2525
import org.apache.fluss.client.table.writer.UpsertWriter;
2626
import org.apache.fluss.client.write.HashBucketAssigner;
27+
import org.apache.fluss.exception.KvSnapshotConsumerNotExistException;
2728
import org.apache.fluss.metadata.DataLakeFormat;
2829
import org.apache.fluss.metadata.Schema;
2930
import org.apache.fluss.metadata.TableBucket;
@@ -34,6 +35,8 @@
3435
import org.apache.fluss.row.ProjectedRow;
3536
import org.apache.fluss.row.encode.CompactedKeyEncoder;
3637
import org.apache.fluss.row.encode.KeyEncoder;
38+
import org.apache.fluss.server.zk.ZooKeeperClient;
39+
import org.apache.fluss.server.zk.data.KvSnapshotConsumer;
3740
import org.apache.fluss.types.DataTypes;
3841

3942
import org.junit.jupiter.api.AfterEach;
@@ -50,6 +53,7 @@
5053
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
5154
import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
5255
import static org.assertj.core.api.Assertions.assertThat;
56+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5357

5458
/** IT Case for {@link KvSnapshotBatchScanner}. */
5559
class KvSnapshotBatchScannerITCase extends ClientToServerITCaseBase {
@@ -182,6 +186,85 @@ void testScanSnapshotDuringSchemaChange() throws Exception {
182186
testSnapshotRead(tablePath, expectedRowByBuckets);
183187
}
184188

189+
@Test
190+
public void testKvSnapshotConsumer() throws Exception {
191+
TablePath tablePath = TablePath.of(DEFAULT_DB, "test-kv-snapshot-consumer");
192+
long tableId = createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true);
193+
194+
String kvSnapshotConsumer1 = "test-consumer";
195+
String kvSnapshotConsumer2 = "test-consumer2";
196+
197+
// scan the snapshot
198+
Map<TableBucket, List<InternalRow>> expectedRowByBuckets = putRows(tableId, tablePath, 10);
199+
200+
// wait snapshot finish
201+
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
202+
203+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
204+
assertThat(zkClient.getKvSnapshotConsumerList()).isEmpty();
205+
206+
// test register kv snapshot consumer for snapshot 0.
207+
Map<TableBucket, Long> consumeBuckets = new HashMap<>();
208+
KvSnapshots kvSnapshots = admin.getLatestKvSnapshots(tablePath).get();
209+
for (int bucketId : kvSnapshots.getBucketIds()) {
210+
TableBucket tableBucket = new TableBucket(kvSnapshots.getTableId(), bucketId);
211+
consumeBuckets.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong());
212+
}
213+
admin.registerKvSnapshotConsumer(kvSnapshotConsumer1, consumeBuckets).get();
214+
checkKvSnapshotConsumerEquals(
215+
zkClient, kvSnapshotConsumer1, 3, tableId, new Long[] {0L, 0L, 0L});
216+
217+
expectedRowByBuckets = putRows(tableId, tablePath, 10);
218+
// wait snapshot2 finish
219+
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
220+
221+
// test register kv snapshot consumer for snapshot 1.
222+
consumeBuckets = new HashMap<>();
223+
kvSnapshots = admin.getLatestKvSnapshots(tablePath).get();
224+
for (int bucketId : kvSnapshots.getBucketIds()) {
225+
TableBucket tableBucket = new TableBucket(kvSnapshots.getTableId(), bucketId);
226+
consumeBuckets.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong());
227+
}
228+
admin.registerKvSnapshotConsumer(kvSnapshotConsumer2, consumeBuckets).get();
229+
checkKvSnapshotConsumerEquals(
230+
zkClient, kvSnapshotConsumer2, 3, tableId, new Long[] {1L, 1L, 1L});
231+
// check even snapshot1 is generated, snapshot0 also retained as consumer exists.
232+
for (TableBucket tb : expectedRowByBuckets.keySet()) {
233+
assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isTrue();
234+
assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isTrue();
235+
}
236+
237+
// unregister partial snapshot for consumer1.
238+
admin.unregisterKvSnapshotConsumer(
239+
kvSnapshotConsumer1, Collections.singleton(new TableBucket(tableId, 0)))
240+
.get();
241+
checkKvSnapshotConsumerEquals(
242+
zkClient, kvSnapshotConsumer1, 2, tableId, new Long[] {-1L, 0L, 0L});
243+
244+
// unregister all snapshot for consumer2.
245+
admin.unregisterKvSnapshotConsumer(kvSnapshotConsumer2, consumeBuckets.keySet()).get();
246+
assertThat(zkClient.getKvSnapshotConsumerList()).doesNotContain(kvSnapshotConsumer2);
247+
248+
// clear consumer1
249+
admin.clearKvSnapshotConsumer(kvSnapshotConsumer1).get();
250+
assertThat(zkClient.getKvSnapshotConsumerList()).isEmpty();
251+
252+
expectedRowByBuckets = putRows(tableId, tablePath, 10);
253+
// wait snapshot2 finish
254+
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 2);
255+
// as all consumers are cleared, and new snapshot is generated, all old snapshot are
256+
// cleared.
257+
for (TableBucket tb : expectedRowByBuckets.keySet()) {
258+
assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isFalse();
259+
assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isFalse();
260+
}
261+
262+
assertThatThrownBy(() -> admin.clearKvSnapshotConsumer("no-exist-consumer").get())
263+
.rootCause()
264+
.isInstanceOf(KvSnapshotConsumerNotExistException.class)
265+
.hasMessageContaining("kv snapshot consumer 'no-exist-consumer' not exits");
266+
}
267+
185268
private Map<TableBucket, List<InternalRow>> putRows(
186269
long tableId, TablePath tablePath, int rowNumber) throws Exception {
187270
List<InternalRow> rows = new ArrayList<>();
@@ -246,4 +329,24 @@ private void waitUntilAllSnapshotFinished(Set<TableBucket> tableBuckets, long sn
246329
FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, snapshotId);
247330
}
248331
}
332+
333+
private void checkKvSnapshotConsumerEquals(
334+
ZooKeeperClient zkClient,
335+
String consumerId,
336+
int expectedSize,
337+
long tableId,
338+
Long[] expectedBucketIndex)
339+
throws Exception {
340+
assertThat(zkClient.getKvSnapshotConsumerList()).contains(consumerId);
341+
assertThat(zkClient.getKvSnapshotConsumer(consumerId)).isPresent();
342+
KvSnapshotConsumer actual = zkClient.getKvSnapshotConsumer(consumerId).get();
343+
assertThat(actual.getTableIdToPartitions()).isEmpty();
344+
assertThat(actual.getPartitionIdToSnapshots()).isEmpty();
345+
assertThat(actual.getConsumedSnapshotCount()).isEqualTo(expectedSize);
346+
Long[] bucketIndex = actual.getTableIdToSnapshots().get(tableId);
347+
assertThat(bucketIndex).hasSize(DEFAULT_BUCKET_NUM);
348+
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
349+
assertThat(bucketIndex[i]).isEqualTo(expectedBucketIndex[i]);
350+
}
351+
}
249352
}

0 commit comments

Comments
 (0)