Skip to content

Commit 31f4087

Browse files
committed
change metadata on zk node to on remote fs
1 parent 1730d60 commit 31f4087

File tree

46 files changed

+1823
-1163
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1823
-1163
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.fluss.exception.InvalidPartitionException;
3535
import org.apache.fluss.exception.InvalidReplicationFactorException;
3636
import org.apache.fluss.exception.InvalidTableException;
37-
import org.apache.fluss.exception.KvSnapshotLeaseNotExistException;
3837
import org.apache.fluss.exception.KvSnapshotNotExistException;
3938
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
4039
import org.apache.fluss.exception.NonPrimaryKeyTableException;
@@ -408,9 +407,8 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
408407
* Acquires a lease for specific KV snapshots of the given tableBuckets asynchronously.
409408
*
410409
* <p>Once acquired, the specified KV snapshots will be protected from garbage collection for
411-
* the duration of the {@code leaseDuration}. The client must call {@link #renewKvSnapshotLease}
412-
* periodically to keep the lease alive, or call {@link #releaseKvSnapshotLease} to release the
413-
* lock early when reading is finished.
410+
* the duration of the {@code leaseDuration}. The client must call {@link
411+
* #releaseKvSnapshotLease} to release the lock early when reading is finished.
414412
*
415413
* <p>If the lease expires (no renew received within duration), the server is free to delete the
416414
* snapshot files.
@@ -430,24 +428,6 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
430428
CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
431429
String leaseId, Map<TableBucket, Long> snapshotIds, long leaseDuration);
432430

433-
/**
434-
* Renews the lease for the given leaseId asynchronously.
435-
*
436-
* <p>This acts as a heartbeat. It resets the expiration time for ALL tableBuckets currently
437-
* associated with this {@code leaseId}.
438-
*
439-
* <p>The following exceptions can be anticipated:
440-
*
441-
* <ul>
442-
* <li>{@link KvSnapshotLeaseNotExistException} if the leaseId does not exist or has already
443-
* expired.
444-
* </ul>
445-
*
446-
* @param leaseId The lease id to renew.
447-
* @param leaseDuration The new duration (in milliseconds) from now.
448-
*/
449-
CompletableFuture<Void> renewKvSnapshotLease(String leaseId, long leaseDuration);
450-
451431
/**
452432
* Releases the lease for specific tableBuckets asynchronously.
453433
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7575
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7676
import org.apache.fluss.rpc.messages.PbTablePath;
77-
import org.apache.fluss.rpc.messages.RenewKvSnapshotLeaseRequest;
7877
import org.apache.fluss.rpc.messages.TableExistsRequest;
7978
import org.apache.fluss.rpc.messages.TableExistsResponse;
8079
import org.apache.fluss.rpc.protocol.ApiError;
@@ -396,15 +395,6 @@ public CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
396395
.thenApply(ClientRpcMessageUtils::toAcquireKvSnapshotLeaseResult);
397396
}
398397

399-
@Override
400-
public CompletableFuture<Void> renewKvSnapshotLease(String leaseId, long leaseDuration) {
401-
RenewKvSnapshotLeaseRequest request =
402-
new RenewKvSnapshotLeaseRequest()
403-
.setLeaseId(leaseId)
404-
.setLeaseDuration(leaseDuration);
405-
return gateway.renewKvSnapshotLease(request).thenApply(r -> null);
406-
}
407-
408398
@Override
409399
public CompletableFuture<Void> releaseKvSnapshotLease(
410400
String leaseId, Set<TableBucket> bucketsToRelease) {

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import org.apache.fluss.row.encode.CompactedKeyEncoder;
3737
import org.apache.fluss.row.encode.KeyEncoder;
3838
import org.apache.fluss.server.zk.ZooKeeperClient;
39-
import org.apache.fluss.server.zk.data.KvSnapshotLease;
39+
import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
40+
import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataHelper;
41+
import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
4042
import org.apache.fluss.types.DataTypes;
4143

4244
import org.junit.jupiter.api.AfterEach;
@@ -49,6 +51,7 @@
4951
import java.util.HashMap;
5052
import java.util.List;
5153
import java.util.Map;
54+
import java.util.Optional;
5255
import java.util.Set;
5356

5457
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
@@ -202,6 +205,10 @@ public void testKvSnapshotLease() throws Exception {
202205
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
203206

204207
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
208+
String remoteDataDir = FLUSS_CLUSTER_EXTENSION.getRemoteDataDir();
209+
KvSnapshotLeaseMetadataHelper metadataHelper =
210+
new KvSnapshotLeaseMetadataHelper(zkClient, remoteDataDir);
211+
205212
assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty();
206213

207214
// test register kv snapshot lease for snapshot 0.
@@ -214,7 +221,8 @@ public void testKvSnapshotLease() throws Exception {
214221
admin.acquireKvSnapshotLease(
215222
kvSnapshotLease1, consumeBuckets, Duration.ofDays(1).toMillis())
216223
.get();
217-
checkKvSnapshotLeaseEquals(zkClient, kvSnapshotLease1, 3, tableId, new Long[] {0L, 0L, 0L});
224+
checkKvSnapshotLeaseEquals(
225+
metadataHelper, kvSnapshotLease1, tableId, new Long[] {0L, 0L, 0L});
218226

219227
expectedRowByBuckets = putRows(tableId, tablePath, 10);
220228
// wait snapshot2 finish
@@ -230,7 +238,8 @@ public void testKvSnapshotLease() throws Exception {
230238
admin.acquireKvSnapshotLease(
231239
kvSnapshotLease2, consumeBuckets, Duration.ofDays(1).toMillis())
232240
.get();
233-
checkKvSnapshotLeaseEquals(zkClient, kvSnapshotLease2, 3, tableId, new Long[] {1L, 1L, 1L});
241+
checkKvSnapshotLeaseEquals(
242+
metadataHelper, kvSnapshotLease2, tableId, new Long[] {1L, 1L, 1L});
234243
// check even snapshot1 is generated, snapshot0 also retained as lease exists.
235244
for (TableBucket tb : expectedRowByBuckets.keySet()) {
236245
assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isTrue();
@@ -246,7 +255,7 @@ public void testKvSnapshotLease() throws Exception {
246255
kvSnapshotLease1, Collections.singleton(new TableBucket(tableId, 0)))
247256
.get();
248257
checkKvSnapshotLeaseEquals(
249-
zkClient, kvSnapshotLease1, 2, tableId, new Long[] {-1L, 0L, 0L});
258+
metadataHelper, kvSnapshotLease1, tableId, new Long[] {-1L, 0L, 0L});
250259

251260
// release lease2.
252261
admin.releaseKvSnapshotLease(kvSnapshotLease2, consumeBuckets.keySet()).get();
@@ -338,22 +347,18 @@ private void waitUntilAllSnapshotFinished(Set<TableBucket> tableBuckets, long sn
338347
}
339348

340349
private void checkKvSnapshotLeaseEquals(
341-
ZooKeeperClient zkClient,
350+
KvSnapshotLeaseMetadataHelper metadataHelper,
342351
String leaseId,
343-
int expectedSize,
344352
long tableId,
345353
Long[] expectedBucketIndex)
346354
throws Exception {
347-
assertThat(zkClient.getKvSnapshotLeasesList()).contains(leaseId);
348-
assertThat(zkClient.getKvSnapshotLease(leaseId)).isPresent();
349-
KvSnapshotLease actual = zkClient.getKvSnapshotLease(leaseId).get();
350-
assertThat(actual.getTableIdToPartitions()).isEmpty();
351-
assertThat(actual.getPartitionIdToSnapshots()).isEmpty();
352-
assertThat(actual.getLeasedSnapshotCount()).isEqualTo(expectedSize);
353-
Long[] bucketIndex = actual.getTableIdToSnapshots().get(tableId);
354-
assertThat(bucketIndex).hasSize(DEFAULT_BUCKET_NUM);
355-
for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
356-
assertThat(bucketIndex[i]).isEqualTo(expectedBucketIndex[i]);
357-
}
355+
assertThat(metadataHelper.getLeasesList()).contains(leaseId);
356+
Optional<KvSnapshotLease> leaseOpt = metadataHelper.getLease(leaseId);
357+
assertThat(leaseOpt).isPresent();
358+
KvSnapshotLease actualLease = leaseOpt.get();
359+
Map<Long, KvSnapshotTableLease> tableIdToTableLease = actualLease.getTableIdToTableLease();
360+
KvSnapshotTableLease tableLease = tableIdToTableLease.get(tableId);
361+
assertThat(tableLease).isNotNull();
362+
assertThat(tableLease.getBucketSnapshots()).isEqualTo(expectedBucketIndex);
358363
}
359364
}

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1509,7 +1509,7 @@ public class ConfigOptions {
15091509
.withDescription("The maximum number of completed snapshots to retain.");
15101510

15111511
public static final ConfigOption<Duration> KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL =
1512-
key("kv.snapshot.lease-expiration-check-interval")
1512+
key("kv.snapshot.lease.expiration-check-interval")
15131513
.durationType()
15141514
.defaultValue(Duration.ofMinutes(10))
15151515
.withDescription(

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public class FlussPaths {
9393

9494
private static final String REMOTE_LAKE_DIR_NAME = "lake";
9595

96+
private static final String REMOTE_LEASE_DIR_NAME = "lease";
97+
9698
// ----------------------------------------------------------------------------------------
9799
// LOG/KV Tablet Paths
98100
// ----------------------------------------------------------------------------------------
@@ -722,6 +724,41 @@ public static FsPath remoteLakeTableSnapshotManifestPath(
722724
UUID.randomUUID()));
723725
}
724726

727+
/**
728+
* Returns the remote directory path for storing kv snapshot lease files.
729+
*
730+
* <p>The path contract:
731+
*
732+
* <pre>
733+
* {$remote.data.dir}/lease/kv-snapshot/{leaseId}/{tableId}/
734+
* </pre>
735+
*/
736+
private static FsPath remoteKvSnapshotLeaseDir(
737+
String remoteDataDir, String leaseId, long tableId) {
738+
return new FsPath(
739+
String.format(
740+
"%s/%s/kv-snapshot/%s/%d",
741+
remoteDataDir, REMOTE_LEASE_DIR_NAME, leaseId, tableId));
742+
}
743+
744+
/**
745+
* Returns the remote file path for storing kv snapshot lease files.
746+
*
747+
* <p>The path contract:
748+
*
749+
* <pre>
750+
* {$remoteKvSnapshotLeaseDir}/{uuid}.metadata
751+
* </pre>
752+
*/
753+
public static FsPath remoteKvSnapshotLeaseFile(
754+
String remoteDataDir, String leaseId, long tableId) {
755+
return new FsPath(
756+
String.format(
757+
"%s/%s.metadata",
758+
remoteKvSnapshotLeaseDir(remoteDataDir, leaseId, tableId),
759+
UUID.randomUUID()));
760+
}
761+
725762
/**
726763
* Returns the remote directory path for storing kv snapshot shared files (SST files with UUID
727764
* prefix).

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class FlinkConnectorOptions {
6464
+ "The list should be in the form host1:port1,host2:port2,....");
6565

6666
public static final ConfigOption<String> SCAN_KV_SNAPSHOT_LEASE_ID =
67-
ConfigOptions.key("scan.kv.snapshot.lease-id")
67+
ConfigOptions.key("scan.kv.snapshot.lease.id")
6868
.stringType()
6969
.defaultValue(String.valueOf(UUID.randomUUID()))
7070
.withDescription(
@@ -73,7 +73,7 @@ public class FlinkConnectorOptions {
7373
+ "is reached. If not set, an UUID will be set.");
7474

7575
public static final ConfigOption<Duration> SCAN_KV_SNAPSHOT_LEASE_DURATION =
76-
ConfigOptions.key("scan.kv.snapshot.lease-duration")
76+
ConfigOptions.key("scan.kv.snapshot.lease.duration")
7777
.durationType()
7878
.defaultValue(Duration.ofDays(1))
7979
.withDescription(

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.flink.lake.LakeTableFactory;
2525
import org.apache.fluss.flink.sink.FlinkTableSink;
2626
import org.apache.fluss.flink.source.FlinkTableSource;
27+
import org.apache.fluss.flink.source.reader.LeaseContext;
2728
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
2829
import org.apache.fluss.metadata.DataLakeFormat;
2930
import org.apache.fluss.metadata.TablePath;
@@ -138,6 +139,15 @@ public DynamicTableSource createDynamicTableSource(Context context) {
138139
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
139140
.toMillis();
140141

142+
LeaseContext leaseContext =
143+
primaryKeyIndexes.length > 0
144+
? new LeaseContext(
145+
tableOptions.get(FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_ID),
146+
tableOptions
147+
.get(FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION)
148+
.toMillis())
149+
: new LeaseContext(null, null);
150+
141151
return new FlinkTableSource(
142152
toFlussTablePath(context.getObjectIdentifier()),
143153
toFlussClientConfig(
@@ -154,8 +164,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
154164
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
155165
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
156166
context.getCatalogTable().getOptions(),
157-
tableOptions.get(FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_ID),
158-
tableOptions.get(FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION).toMillis());
167+
leaseContext);
159168
}
160169

161170
@Override

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
2626
import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
2727
import org.apache.fluss.flink.source.reader.FlinkSourceReader;
28+
import org.apache.fluss.flink.source.reader.LeaseContext;
2829
import org.apache.fluss.flink.source.reader.RecordAndPos;
2930
import org.apache.fluss.flink.source.split.SourceSplitBase;
3031
import org.apache.fluss.flink.source.split.SourceSplitSerializer;
@@ -67,8 +68,7 @@ public class FlinkSource<OUT>
6768
private final FlussDeserializationSchema<OUT> deserializationSchema;
6869
@Nullable private final Predicate partitionFilters;
6970
@Nullable private final LakeSource<LakeSplit> lakeSource;
70-
private final String kvSnapshotLeaseId;
71-
private final long kvSnapshotLeaseDurationMs;
71+
private final LeaseContext leaseContext;
7272

7373
public FlinkSource(
7474
Configuration flussConf,
@@ -82,8 +82,7 @@ public FlinkSource(
8282
FlussDeserializationSchema<OUT> deserializationSchema,
8383
boolean streaming,
8484
@Nullable Predicate partitionFilters,
85-
String kvSnapshotLeaseId,
86-
long kvSnapshotLeaseDurationMs) {
85+
LeaseContext leaseContext) {
8786
this(
8887
flussConf,
8988
tablePath,
@@ -97,8 +96,7 @@ public FlinkSource(
9796
streaming,
9897
partitionFilters,
9998
null,
100-
kvSnapshotLeaseId,
101-
kvSnapshotLeaseDurationMs);
99+
leaseContext);
102100
}
103101

104102
public FlinkSource(
@@ -114,8 +112,7 @@ public FlinkSource(
114112
boolean streaming,
115113
@Nullable Predicate partitionFilters,
116114
@Nullable LakeSource<LakeSplit> lakeSource,
117-
String kvSnapshotLeaseId,
118-
long kvSnapshotLeaseDurationMs) {
115+
LeaseContext leaseContext) {
119116
this.flussConf = flussConf;
120117
this.tablePath = tablePath;
121118
this.hasPrimaryKey = hasPrimaryKey;
@@ -128,8 +125,7 @@ public FlinkSource(
128125
this.streaming = streaming;
129126
this.partitionFilters = partitionFilters;
130127
this.lakeSource = lakeSource;
131-
this.kvSnapshotLeaseId = kvSnapshotLeaseId;
132-
this.kvSnapshotLeaseDurationMs = kvSnapshotLeaseDurationMs;
128+
this.leaseContext = leaseContext;
133129
}
134130

135131
@Override
@@ -151,8 +147,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
151147
streaming,
152148
partitionFilters,
153149
lakeSource,
154-
kvSnapshotLeaseId,
155-
kvSnapshotLeaseDurationMs);
150+
leaseContext);
156151
}
157152

158153
@Override
@@ -173,8 +168,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
173168
streaming,
174169
partitionFilters,
175170
lakeSource,
176-
kvSnapshotLeaseId,
177-
kvSnapshotLeaseDurationMs);
171+
sourceEnumeratorState.getLeaseContext());
178172
}
179173

180174
@Override

0 commit comments

Comments
 (0)