Skip to content

Commit f26312f

Browse files
authored
[lake] Use remote file to store lake snapshot offsets to reduce the size of zookeeper node (apache#2037)
1 parent e47748b commit f26312f

File tree

57 files changed

+1850
-982
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1850
-982
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/LakeSnapshot.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,9 @@ public class LakeSnapshot {
3737
// the specific log offset of the snapshot
3838
private final Map<TableBucket, Long> tableBucketsOffset;
3939

40-
// the partition name by partition id of this lake snapshot if
41-
// is a partitioned table, empty if not a partitioned table
42-
private final Map<Long, String> partitionNameById;
43-
44-
public LakeSnapshot(
45-
long snapshotId,
46-
Map<TableBucket, Long> tableBucketsOffset,
47-
Map<Long, String> partitionNameById) {
40+
public LakeSnapshot(long snapshotId, Map<TableBucket, Long> tableBucketsOffset) {
4841
this.snapshotId = snapshotId;
4942
this.tableBucketsOffset = tableBucketsOffset;
50-
this.partitionNameById = partitionNameById;
5143
}
5244

5345
public long getSnapshotId() {
@@ -58,19 +50,13 @@ public Map<TableBucket, Long> getTableBucketsOffset() {
5850
return Collections.unmodifiableMap(tableBucketsOffset);
5951
}
6052

61-
public Map<Long, String> getPartitionNameById() {
62-
return Collections.unmodifiableMap(partitionNameById);
63-
}
64-
6553
@Override
6654
public String toString() {
6755
return "LakeSnapshot{"
6856
+ "snapshotId="
6957
+ snapshotId
7058
+ ", tableBucketsOffset="
7159
+ tableBucketsOffset
72-
+ ", partitionNameById="
73-
+ partitionNameById
7460
+ '}';
7561
}
7662
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -215,20 +215,16 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
215215
long snapshotId = response.getSnapshotId();
216216
Map<TableBucket, Long> tableBucketsOffset =
217217
new HashMap<>(response.getBucketSnapshotsCount());
218-
Map<Long, String> partitionNameById = new HashMap<>();
219218
for (PbLakeSnapshotForBucket pbLakeSnapshotForBucket : response.getBucketSnapshotsList()) {
220219
Long partitionId =
221220
pbLakeSnapshotForBucket.hasPartitionId()
222221
? pbLakeSnapshotForBucket.getPartitionId()
223222
: null;
224223
TableBucket tableBucket =
225224
new TableBucket(tableId, partitionId, pbLakeSnapshotForBucket.getBucketId());
226-
if (partitionId != null && pbLakeSnapshotForBucket.hasPartitionName()) {
227-
partitionNameById.put(partitionId, pbLakeSnapshotForBucket.getPartitionName());
228-
}
229225
tableBucketsOffset.put(tableBucket, pbLakeSnapshotForBucket.getLogOffset());
230226
}
231-
return new LakeSnapshot(snapshotId, tableBucketsOffset, partitionNameById);
227+
return new LakeSnapshot(snapshotId, tableBucketsOffset);
232228
}
233229

234230
public static List<FsPathAndFileName> toFsPathAndFileName(

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,17 @@ public class ConfigOptions {
267267
public static final ConfigOption<List<String>> SERVER_SASL_ENABLED_MECHANISMS_CONFIG =
268268
key("security.sasl.enabled.mechanisms").stringType().asList().noDefaultValue();
269269

270+
public static final ConfigOption<Integer> SERVER_IO_POOL_SIZE =
271+
key("server.io-pool.size")
272+
.intType()
273+
.defaultValue(10)
274+
.withDescription(
275+
"The size of the IO thread pool to run blocking operations for both coordinator and tablet servers. "
276+
+ "This includes discard unnecessary snapshot files, transfer kv snapshot files, "
277+
+ "and transfer remote log files. Increase this value if you experience slow IO operations. "
278+
+ "The default value is 10.")
279+
.withDeprecatedKeys("coordinator.io-pool.size");
280+
270281
// ------------------------------------------------------------------------
271282
// ConfigOptions for Coordinator Server
272283
// ------------------------------------------------------------------------
@@ -324,6 +335,11 @@ public class ConfigOptions {
324335
+ " (“50100,50101”), ranges (“50100-50200”) or a combination of both."
325336
+ "This option is deprecated. Please use bind.listeners instead, which provides a more flexible configuration for multiple ports");
326337

338+
/**
339+
* @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE}
340+
* instead.
341+
*/
342+
@Deprecated
327343
public static final ConfigOption<Integer> COORDINATOR_IO_POOL_SIZE =
328344
key("coordinator.io-pool.size")
329345
.intType()
@@ -332,7 +348,8 @@ public class ConfigOptions {
332348
"The size of the IO thread pool to run blocking operations for coordinator server. "
333349
+ "This includes discard unnecessary snapshot files. "
334350
+ "Increase this value if you experience slow unnecessary snapshot files clean. "
335-
+ "The default value is 10.");
351+
+ "The default value is 10. "
352+
+ "This option is deprecated. Please use server.io-pool.size instead.");
336353

337354
// ------------------------------------------------------------------------
338355
// ConfigOptions for Tablet Server
@@ -769,13 +786,19 @@ public class ConfigOptions {
769786
"Size of the thread pool used in scheduling tasks to copy segments, "
770787
+ "fetch remote log indexes and clean up remote log segments.");
771788

789+
/**
790+
* @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE}
791+
* instead.
792+
*/
793+
@Deprecated
772794
public static final ConfigOption<Integer> REMOTE_LOG_DATA_TRANSFER_THREAD_NUM =
773795
key("remote.log.data-transfer-thread-num")
774796
.intType()
775797
.defaultValue(4)
776798
.withDescription(
777799
"The number of threads the server uses to transfer (download and upload) "
778-
+ "remote log file can be data file, index file and remote log metadata file.");
800+
+ "remote log file can be data file, index file and remote log metadata file. "
801+
+ "This option is deprecated. Please use server.io-pool.size instead.");
779802

780803
// ------------------------------------------------------------------------
781804
// Netty Settings
@@ -1441,12 +1464,18 @@ public class ConfigOptions {
14411464
.withDescription(
14421465
"The number of threads that the server uses to schedule snapshot kv data for all the replicas in the server.");
14431466

1467+
/**
1468+
* @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE}
1469+
* instead.
1470+
*/
1471+
@Deprecated
14441472
public static final ConfigOption<Integer> KV_SNAPSHOT_TRANSFER_THREAD_NUM =
14451473
key("kv.snapshot.transfer-thread-num")
14461474
.intType()
14471475
.defaultValue(4)
14481476
.withDescription(
1449-
"The number of threads the server uses to transfer (download and upload) kv snapshot files.");
1477+
"The number of threads the server uses to transfer (download and upload) kv snapshot files. "
1478+
+ "This option is deprecated. Please use server.io-pool.size instead.");
14501479

14511480
public static final ConfigOption<Integer> KV_MAX_RETAINED_SNAPSHOTS =
14521481
key("kv.snapshot.num-retained")

fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,11 @@ public class BucketOffset implements Serializable {
3131
private final long logOffset;
3232
private final int bucket;
3333
private final @Nullable Long partitionId;
34-
private final @Nullable String partitionQualifiedName;
3534

36-
public BucketOffset(
37-
long logOffset,
38-
int bucket,
39-
@Nullable Long partitionId,
40-
@Nullable String partitionQualifiedName) {
35+
public BucketOffset(long logOffset, int bucket, @Nullable Long partitionId) {
4136
this.logOffset = logOffset;
4237
this.bucket = bucket;
4338
this.partitionId = partitionId;
44-
this.partitionQualifiedName = partitionQualifiedName;
4539
}
4640

4741
public long getLogOffset() {
@@ -57,11 +51,6 @@ public Long getPartitionId() {
5751
return partitionId;
5852
}
5953

60-
@Nullable
61-
public String getPartitionQualifiedName() {
62-
return partitionQualifiedName;
63-
}
64-
6554
@Override
6655
public boolean equals(Object o) {
6756
if (this == o) {
@@ -71,9 +60,13 @@ public boolean equals(Object o) {
7160
return false;
7261
}
7362
BucketOffset that = (BucketOffset) o;
74-
return bucket == that.bucket
75-
&& logOffset == that.logOffset
76-
&& Objects.equals(partitionId, that.partitionId)
77-
&& Objects.equals(partitionQualifiedName, that.partitionQualifiedName);
63+
return logOffset == that.logOffset
64+
&& bucket == that.bucket
65+
&& Objects.equals(partitionId, that.partitionId);
66+
}
67+
68+
@Override
69+
public int hashCode() {
70+
return Objects.hash(logOffset, bucket, partitionId);
7871
}
7972
}

fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@ public class CommittedLakeSnapshot {
3434
// partition bucket
3535
private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new HashMap<>();
3636

37-
// partition id -> partition name, will be empty if is not a partitioned table
38-
// the partition name is a qualified name in the format: key1=value1/key2=value2
39-
private final Map<Long, String> qualifiedPartitionNameById = new HashMap<>();
40-
4137
public CommittedLakeSnapshot(long lakeSnapshotId) {
4238
this.lakeSnapshotId = lakeSnapshotId;
4339
}
@@ -50,34 +46,30 @@ public void addBucket(int bucketId, long offset) {
5046
logEndOffsets.put(Tuple2.of(null, bucketId), offset);
5147
}
5248

53-
public void addPartitionBucket(
54-
Long partitionId, String partitionQualifiedName, int bucketId, long offset) {
49+
public void addPartitionBucket(Long partitionId, int bucketId, long offset) {
5550
logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
56-
qualifiedPartitionNameById.put(partitionId, partitionQualifiedName);
5751
}
5852

5953
public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
6054
return logEndOffsets;
6155
}
6256

63-
public Map<Long, String> getQualifiedPartitionNameById() {
64-
return qualifiedPartitionNameById;
65-
}
66-
6757
@Override
6858
public boolean equals(Object o) {
59+
if (this == o) {
60+
return true;
61+
}
6962
if (o == null || getClass() != o.getClass()) {
7063
return false;
7164
}
7265
CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
7366
return lakeSnapshotId == that.lakeSnapshotId
74-
&& Objects.equals(logEndOffsets, that.logEndOffsets)
75-
&& Objects.equals(qualifiedPartitionNameById, that.qualifiedPartitionNameById);
67+
&& Objects.equals(logEndOffsets, that.logEndOffsets);
7668
}
7769

7870
@Override
7971
public int hashCode() {
80-
return Objects.hash(lakeSnapshotId, logEndOffsets, qualifiedPartitionNameById);
72+
return Objects.hash(lakeSnapshotId, logEndOffsets);
8173
}
8274

8375
@Override
@@ -87,8 +79,6 @@ public String toString() {
8779
+ lakeSnapshotId
8880
+ ", logEndOffsets="
8981
+ logEndOffsets
90-
+ ", partitionNameById="
91-
+ qualifiedPartitionNameById
9282
+ '}';
9383
}
9484
}

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public class FlussPaths {
9191
/** The name of the directory for shared remote snapshot kv files. */
9292
public static final String REMOTE_KV_SNAPSHOT_SHARED_DIR = "shared";
9393

94+
private static final String REMOTE_LAKE_DIR_NAME = "lake";
95+
9496
// ----------------------------------------------------------------------------------------
9597
// LOG/KV Tablet Paths
9698
// ----------------------------------------------------------------------------------------
@@ -681,6 +683,45 @@ public static FsPath remoteKvSnapshotDir(FsPath remoteKvTabletDir, long snapshot
681683
return new FsPath(remoteKvTabletDir, REMOTE_KV_SNAPSHOT_DIR_PREFIX + snapshotId);
682684
}
683685

686+
/**
687+
* Returns the remote path for storing lake snapshot required by Fluss for a table.
688+
*
689+
* <p>The path contract:
690+
*
691+
* <pre>
692+
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}
693+
* </pre>
694+
*/
695+
public static FsPath remoteLakeTableSnapshotDir(
696+
String remoteDataDir, TablePath tablePath, long tableId) {
697+
return new FsPath(
698+
String.format(
699+
"%s/%s/%s/%s-%d",
700+
remoteDataDir,
701+
REMOTE_LAKE_DIR_NAME,
702+
tablePath.getDatabaseName(),
703+
tablePath.getTableName(),
704+
tableId));
705+
}
706+
707+
/**
708+
* Returns a remote path for storing lake snapshot metadata required by Fluss for a table.
709+
*
710+
* <p>The path contract:
711+
*
712+
* <pre>
713+
* {$remoteLakeTableSnapshotMetadataDir}/metadata/{uuid}.manifest
714+
* </pre>
715+
*/
716+
public static FsPath remoteLakeTableSnapshotManifestPath(
717+
String remoteDataDir, TablePath tablePath, long tableId) {
718+
return new FsPath(
719+
String.format(
720+
"%s/metadata/%s.manifest",
721+
remoteLakeTableSnapshotDir(remoteDataDir, tablePath, tableId),
722+
UUID.randomUUID()));
723+
}
724+
684725
/**
685726
* Returns the remote directory path for storing kv snapshot shared files (SST files with UUID
686727
* prefix).

fluss-common/src/main/java/org/apache/fluss/utils/json/BucketOffsetJsonSerde.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ public class BucketOffsetJsonSerde
3030
public static final BucketOffsetJsonSerde INSTANCE = new BucketOffsetJsonSerde();
3131
private static final String PARTITION_ID = "partition_id";
3232
private static final String BUCKET_ID = "bucket";
33-
private static final String PARTITION_NAME = "partition_name";
3433
private static final String LOG_OFFSET = "offset";
3534

3635
@Override
@@ -39,14 +38,10 @@ public BucketOffset deserialize(JsonNode node) {
3938
Long partitionId = partitionIdNode == null ? null : partitionIdNode.asLong();
4039
int bucketId = node.get(BUCKET_ID).asInt();
4140

42-
// deserialize partition name
43-
JsonNode partitionNameNode = node.get(PARTITION_NAME);
44-
String partitionName = partitionNameNode == null ? null : partitionNameNode.asText();
45-
4641
// deserialize log offset
4742
long logOffset = node.get(LOG_OFFSET).asLong();
4843

49-
return new BucketOffset(logOffset, bucketId, partitionId, partitionName);
44+
return new BucketOffset(logOffset, bucketId, partitionId);
5045
}
5146

5247
@Override
@@ -59,11 +54,6 @@ public void serialize(BucketOffset bucketOffset, JsonGenerator generator) throws
5954
}
6055
generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
6156

62-
// serialize partition name
63-
if (bucketOffset.getPartitionQualifiedName() != null) {
64-
generator.writeStringField(PARTITION_NAME, bucketOffset.getPartitionQualifiedName());
65-
}
66-
6757
// serialize bucket offset
6858
generator.writeNumberField(LOG_OFFSET, bucketOffset.getLogOffset());
6959

fluss-common/src/test/java/org/apache/fluss/utils/json/BucketOffsetJsonSerdeTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,13 @@ public class BucketOffsetJsonSerdeTest extends JsonSerdeTestBase<BucketOffset> {
2828

2929
@Override
3030
protected BucketOffset[] createObjects() {
31-
return new BucketOffset[] {
32-
new BucketOffset(10, 1, 1L, "country=eu-central/year=2023/month=12"),
33-
new BucketOffset(20, 2, null, null)
34-
};
31+
return new BucketOffset[] {new BucketOffset(10, 1, 1L), new BucketOffset(20, 2, null)};
3532
}
3633

3734
@Override
3835
protected String[] expectedJsons() {
3936
return new String[] {
40-
"{\"partition_id\":1,\"bucket\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"offset\":10}",
41-
"{\"bucket\":2,\"offset\":20}"
37+
"{\"partition_id\":1,\"bucket\":1,\"offset\":10}", "{\"bucket\":2,\"offset\":20}"
4238
};
4339
}
4440
}

0 commit comments

Comments
 (0)