Skip to content

Commit 959d8a7

Browse files
committed
address baiye's fip comments
1 parent 8d7f948 commit 959d8a7

File tree

31 files changed

+565
-434
lines changed

31 files changed

+565
-434
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2222
import org.apache.fluss.client.metadata.KvSnapshots;
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
24+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2425
import org.apache.fluss.cluster.ServerNode;
2526
import org.apache.fluss.config.ConfigOptions;
2627
import org.apache.fluss.config.cluster.AlterConfig;
@@ -419,8 +420,9 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
419420
*
420421
* @param consumerId the consumer id.
421422
* @param consumeBuckets the tableBuckets to consume, a map from TableBucket to kvSnapshotId.
423+
* @return the result of registering kv snapshot consumer.
422424
*/
423-
CompletableFuture<Void> registerKvSnapshotConsumer(
425+
CompletableFuture<RegisterKvSnapshotResult> registerKvSnapshotConsumer(
424426
String consumerId, Map<TableBucket, Long> consumeBuckets);
425427

426428
/**

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.metadata.KvSnapshots;
2222
import org.apache.fluss.client.metadata.LakeSnapshot;
2323
import org.apache.fluss.client.metadata.MetadataUpdater;
24+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2425
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
2526
import org.apache.fluss.cluster.Cluster;
2627
import org.apache.fluss.cluster.ServerNode;
@@ -387,7 +388,7 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
387388
}
388389

389390
@Override
390-
public CompletableFuture<Void> registerKvSnapshotConsumer(
391+
public CompletableFuture<RegisterKvSnapshotResult> registerKvSnapshotConsumer(
391392
String consumerId, Map<TableBucket, Long> consumeBuckets) {
392393
if (consumeBuckets.isEmpty()) {
393394
throw new IllegalArgumentException("consumeBuckets is empty");
@@ -400,7 +401,7 @@ public CompletableFuture<Void> registerKvSnapshotConsumer(
400401
return gateway.registerKvSnapshotConsumer(
401402
makeRegisterKvSnapshotConsumerRequest(
402403
consumerId, consumeBuckets, expirationTime))
403-
.thenApply(r -> null);
404+
.thenApply(ClientRpcMessageUtils::toRegisterKvSnapshotResult);
404405
}
405406

406407
@Override

fluss-client/src/main/java/org/apache/fluss/client/metadata/KvSnapshots.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.fluss.client.metadata;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
2122

2223
import javax.annotation.Nullable;
2324

2425
import java.util.Map;
2526
import java.util.OptionalLong;
2627
import java.util.Set;
28+
import java.util.stream.Collectors;
2729

2830
/**
2931
* A class representing the kv snapshots of a table or a partition. It contains multiple snapshots
@@ -71,6 +73,12 @@ public Set<Integer> getBucketIds() {
7173
return snapshotIds.keySet();
7274
}
7375

76+
public Set<TableBucket> getTableBuckets() {
77+
return snapshotIds.keySet().stream()
78+
.map(bucketId -> new TableBucket(tableId, partitionId, bucketId))
79+
.collect(Collectors.toSet());
80+
}
81+
7482
/**
7583
* Get the latest snapshot id for this kv tablet (bucket), or empty if there are no snapshots.
7684
*/
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Set;
24+
25+
/**
26+
* A class to represent the result of registering kv snapshot. It contains:
27+
*
28+
* <ul>
29+
* <li>An set of failed tableBuckets. Such as the specify snapshotId is not exist for this table
30+
* bucket.
31+
* </ul>
32+
*
33+
* @since 0.9
34+
*/
35+
@PublicEvolving
36+
public class RegisterKvSnapshotResult {
37+
private final Set<TableBucket> failedTableBucketSet;
38+
39+
public RegisterKvSnapshotResult(Set<TableBucket> failedTableBucketSet) {
40+
this.failedTableBucketSet = failedTableBucketSet;
41+
}
42+
43+
public Set<TableBucket> getFailedTableBucketSet() {
44+
return failedTableBucketSet;
45+
}
46+
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2424
import org.apache.fluss.client.metadata.KvSnapshots;
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
26+
import org.apache.fluss.client.metadata.RegisterKvSnapshotResult;
2627
import org.apache.fluss.client.write.KvWriteBatch;
2728
import org.apache.fluss.client.write.ReadyWriteBatch;
2829
import org.apache.fluss.config.cluster.AlterConfigOpType;
@@ -65,10 +66,12 @@
6566
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
6667
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6768
import org.apache.fluss.rpc.messages.PbRenameColumn;
69+
import org.apache.fluss.rpc.messages.PbTable;
6870
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
6971
import org.apache.fluss.rpc.messages.ProduceLogRequest;
7072
import org.apache.fluss.rpc.messages.PutKvRequest;
7173
import org.apache.fluss.rpc.messages.RegisterKvSnapshotConsumerRequest;
74+
import org.apache.fluss.rpc.messages.RegisterKvSnapshotConsumerResponse;
7275
import org.apache.fluss.rpc.messages.UnregisterKvSnapshotConsumerRequest;
7376
import org.apache.fluss.utils.json.DataTypeJsonSerde;
7477
import org.apache.fluss.utils.json.JsonSerdeUtils;
@@ -79,6 +82,7 @@
7982
import java.util.Arrays;
8083
import java.util.Collection;
8184
import java.util.HashMap;
85+
import java.util.HashSet;
8286
import java.util.List;
8387
import java.util.Map;
8488
import java.util.Set;
@@ -408,6 +412,23 @@ public static RegisterKvSnapshotConsumerRequest makeRegisterKvSnapshotConsumerRe
408412
return request;
409413
}
410414

415+
public static RegisterKvSnapshotResult toRegisterKvSnapshotResult(
416+
RegisterKvSnapshotConsumerResponse response) {
417+
Set<TableBucket> failedTableBucketSet = new HashSet<>();
418+
for (PbTable failedTable : response.getFailedTablesList()) {
419+
long tableId = failedTable.getTableId();
420+
for (PbBucket pbBucket : failedTable.getBucketsList()) {
421+
TableBucket tableBucket =
422+
new TableBucket(
423+
tableId,
424+
pbBucket.hasPartitionId() ? pbBucket.getPartitionId() : null,
425+
pbBucket.getBucketId());
426+
failedTableBucketSet.add(tableBucket);
427+
}
428+
}
429+
return new RegisterKvSnapshotResult(failedTableBucketSet);
430+
}
431+
411432
public static UnregisterKvSnapshotConsumerRequest makeUnregisterKvSnapshotConsumerRequest(
412433
String consumerId, Set<TableBucket> bucketsToUnregister) {
413434
UnregisterKvSnapshotConsumerRequest request = new UnregisterKvSnapshotConsumerRequest();

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,6 +1462,12 @@ public class ConfigOptions {
14621462
.withDescription(
14631463
"The number of threads the server uses to transfer (download and upload) kv snapshot files.");
14641464

1465+
public static final ConfigOption<Integer> KV_MAX_RETAINED_SNAPSHOTS =
1466+
key("kv.snapshot.num-retained")
1467+
.intType()
1468+
.defaultValue(1)
1469+
.withDescription("The maximum number of completed snapshots to retain.");
1470+
14651471
public static final ConfigOption<Duration> KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL =
14661472
key("kv.snapshot.consumer-expiration-check-interval")
14671473
.durationType()

fluss-common/src/main/java/org/apache/fluss/metadata/ConsumeKvSnapshotForBucket.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public boolean equals(Object o) {
5656
return false;
5757
}
5858
ConsumeKvSnapshotForBucket that = (ConsumeKvSnapshotForBucket) o;
59-
return kvSnapshotId == that.kvSnapshotId && tableBucket.equals(that.tableBucket);
59+
return kvSnapshotId == that.kvSnapshotId && Objects.equals(tableBucket, that.tableBucket);
6060
}
6161

6262
@Override

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 96 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -356,35 +356,7 @@ private void startInStreamModeForNonPartitionedTable() {
356356

357357
private List<SourceSplitBase> initNonPartitionedSplits() {
358358
if (hasPrimaryKey && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) {
359-
// get the table snapshot info
360-
final KvSnapshots kvSnapshots;
361-
try {
362-
kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath).get();
363-
364-
Map<TableBucket, Long> consumeBuckets = new HashMap<>();
365-
for (Integer bucketId : kvSnapshots.getBucketIds()) {
366-
TableBucket tb =
367-
new TableBucket(
368-
kvSnapshots.getTableId(),
369-
kvSnapshots.getPartitionId(),
370-
bucketId);
371-
OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucketId);
372-
if (!ignoreTableBucket(tb) && snapshotIdOpt.isPresent()) {
373-
consumeBuckets.put(tb, snapshotIdOpt.getAsLong());
374-
}
375-
}
376-
377-
if (!consumeBuckets.isEmpty()) {
378-
flussAdmin
379-
.registerKvSnapshotConsumer(kvSnapshotConsumerId, consumeBuckets)
380-
.get();
381-
}
382-
} catch (Exception e) {
383-
throw new FlinkRuntimeException(
384-
String.format("Failed to get table snapshot for %s", tablePath),
385-
ExceptionUtils.stripCompletionException(e));
386-
}
387-
return getSnapshotAndLogSplits(kvSnapshots, null);
359+
return getSnapshotAndLogSplits(getLatestKvSnapshotsAndRegister(null), null);
388360
} else {
389361
return getLogSplit(null, null);
390362
}
@@ -557,39 +529,84 @@ private List<SourceSplitBase> initPrimaryKeyTablePartitionSplits(
557529
List<SourceSplitBase> splits = new ArrayList<>();
558530
for (Partition partition : newPartitions) {
559531
String partitionName = partition.getPartitionName();
560-
// get the table snapshot info
561-
final KvSnapshots kvSnapshots;
562-
try {
563-
kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath, partitionName).get();
532+
splits.addAll(
533+
getSnapshotAndLogSplits(
534+
getLatestKvSnapshotsAndRegister(partitionName), partitionName));
535+
}
536+
return splits;
537+
}
538+
539+
private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String partitionName) {
540+
long tableId;
541+
Long partitionId;
542+
Map<Integer, Long> snapshotIds = new HashMap<>();
543+
Map<Integer, Long> logOffsets = new HashMap<>();
544+
545+
// retry to get the latest kv snapshots and register kvSnapshot consumer util all buckets
546+
// register success. The reason is that getLatestKvSnapshots and registerKvSnapshotConsumer
547+
// are not atomic operations, the latest kv snapshot obtained via get may become outdated by
548+
// the time it is passed to register. Therefore, this logic must implement a retry
549+
// mechanism: the failedTableBucketSet in the RegisterKvSnapshotResult returned by
550+
// registerKvSnapshotConsumer must be retried repeatedly until all buckets are successfully
551+
// registered.
552+
try {
553+
Set<TableBucket> remainingTableBuckets;
554+
do {
555+
KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
556+
remainingTableBuckets = new HashSet<>(kvSnapshots.getTableBuckets());
557+
558+
tableId = kvSnapshots.getTableId();
559+
partitionId = kvSnapshots.getPartitionId();
564560

565561
Map<TableBucket, Long> consumeBuckets = new HashMap<>();
566-
for (Integer bucketId : kvSnapshots.getBucketIds()) {
567-
TableBucket tb =
568-
new TableBucket(
569-
kvSnapshots.getTableId(),
570-
kvSnapshots.getPartitionId(),
571-
bucketId);
572-
OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucketId);
562+
for (TableBucket tb : remainingTableBuckets) {
563+
int bucket = tb.getBucket();
564+
OptionalLong snapshotIdOpt = kvSnapshots.getSnapshotId(bucket);
565+
OptionalLong logOffsetOpt = kvSnapshots.getLogOffset(bucket);
566+
if (snapshotIdOpt.isPresent() && logOffsetOpt.isPresent()) {
567+
snapshotIds.put(bucket, snapshotIdOpt.getAsLong());
568+
logOffsets.put(bucket, logOffsetOpt.getAsLong());
569+
}
570+
573571
if (!ignoreTableBucket(tb) && snapshotIdOpt.isPresent()) {
574572
consumeBuckets.put(tb, snapshotIdOpt.getAsLong());
575573
}
576574
}
577575

578576
if (!consumeBuckets.isEmpty()) {
579-
flussAdmin
580-
.registerKvSnapshotConsumer(kvSnapshotConsumerId, consumeBuckets)
581-
.get();
577+
LOG.info(
578+
"Try to Register kv snapshot consumer {} for table {}",
579+
kvSnapshotConsumerId,
580+
tablePath);
581+
remainingTableBuckets =
582+
flussAdmin
583+
.registerKvSnapshotConsumer(
584+
kvSnapshotConsumerId, consumeBuckets)
585+
.get()
586+
.getFailedTableBucketSet();
587+
if (!remainingTableBuckets.isEmpty()) {
588+
LOG.info(
589+
"Failed to register kv snapshot consumer for table {}: {}. Retry to register",
590+
tablePath,
591+
remainingTableBuckets);
592+
}
582593
}
583-
} catch (Exception e) {
584-
throw new FlinkRuntimeException(
585-
String.format(
586-
"Failed to get and register table snapshot for table %s and partition %s",
587-
tablePath, partitionName),
588-
ExceptionUtils.stripCompletionException(e));
589-
}
590-
splits.addAll(getSnapshotAndLogSplits(kvSnapshots, partitionName));
594+
} while (!remainingTableBuckets.isEmpty());
595+
} catch (Exception e) {
596+
throw new FlinkRuntimeException(
597+
String.format("Failed to get table snapshot for %s", tablePath),
598+
ExceptionUtils.stripCompletionException(e));
599+
}
600+
601+
return new KvSnapshots(tableId, partitionId, snapshotIds, logOffsets);
602+
}
603+
604+
private KvSnapshots getLatestKvSnapshots(@Nullable String partitionName) throws Exception {
605+
if (partitionName == null) {
606+
return flussAdmin.getLatestKvSnapshots(tablePath).get();
607+
} else {
608+
return flussAdmin.getLatestKvSnapshots(tablePath, partitionName).get();
591609
}
592-
return splits;
593610
}
594611

595612
private List<SourceSplitBase> getSnapshotAndLogSplits(
@@ -907,8 +924,15 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
907924
} else if (sourceEvent instanceof FinishedKvSnapshotConsumeEvent) {
908925
FinishedKvSnapshotConsumeEvent event = (FinishedKvSnapshotConsumeEvent) sourceEvent;
909926
long checkpointId = event.getCheckpointId();
910-
event.getTableBuckets()
911-
.forEach(tableBucket -> addConsumedBucket(checkpointId, tableBucket));
927+
Set<TableBucket> tableBuckets = event.getTableBuckets();
928+
if (!tableBuckets.isEmpty()) {
929+
LOG.info(
930+
"Received finished kv snapshot consumer event for buckets: {}, checkpoint id: {}",
931+
tableBuckets,
932+
checkpointId);
933+
}
934+
935+
tableBuckets.forEach(tableBucket -> addConsumedBucket(checkpointId, tableBucket));
912936
}
913937
}
914938

@@ -948,8 +972,24 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
948972
// lower than this checkpoint id.
949973
Set<TableBucket> consumedKvSnapshots = getAndRemoveConsumedBucketsUpTo(checkpointId);
950974

951-
// send request to fluss admin to unregister the kv snapshot consumer.
952-
flussAdmin.unregisterKvSnapshotConsumer(kvSnapshotConsumerId, consumedKvSnapshots).get();
975+
LOG.info(
976+
"kv snapshot has already consumed and try to unregister kv snapshot consumer for: {}, checkpoint id: {}",
977+
consumedKvSnapshots,
978+
checkpointId);
979+
980+
// send request to fluss to unregister the kv snapshot consumer.
981+
try {
982+
flussAdmin
983+
.unregisterKvSnapshotConsumer(kvSnapshotConsumerId, consumedKvSnapshots)
984+
.get();
985+
} catch (Exception e) {
986+
LOG.error(
987+
"Failed to unregister kv snapshot consumer. These snapshot need to re-enqueue",
988+
e);
989+
// use the currently checkpoint id to re-enqueue the buckets
990+
consumedKvSnapshots.forEach(
991+
tableBucket -> addConsumedBucket(checkpointId, tableBucket));
992+
}
953993
}
954994

955995
/** Add bucket who has been consumed kv snapshot to the consumedKvSnapshotMap. */

0 commit comments

Comments
 (0)