Skip to content

Commit d511edd

Browse files
committed
address comments
1 parent 50fca1d commit d511edd

File tree

23 files changed

+235
-171
lines changed

23 files changed

+235
-171
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,17 @@ public class ConfigOptions {
267267
public static final ConfigOption<List<String>> SERVER_SASL_ENABLED_MECHANISMS_CONFIG =
268268
key("security.sasl.enabled.mechanisms").stringType().asList().noDefaultValue();
269269

270+
public static final ConfigOption<Integer> SERVER_IO_POOL_SIZE =
271+
key("server.io-pool.size")
272+
.intType()
273+
.defaultValue(10)
274+
.withDescription(
275+
"The size of the IO thread pool to run blocking operations for both coordinator and tablet servers. "
276+
+ "This includes discard unnecessary snapshot files, transfer kv snapshot files, "
277+
+ "and transfer remote log files. Increase this value if you experience slow IO operations. "
278+
+ "The default value is 10.")
279+
.withDeprecatedKeys("coordinator.io-pool.size");
280+
270281
// ------------------------------------------------------------------------
271282
// ConfigOptions for Coordinator Server
272283
// ------------------------------------------------------------------------
@@ -324,6 +335,11 @@ public class ConfigOptions {
324335
+ " (“50100,50101”), ranges (“50100-50200”) or a combination of both."
325336
+ "This option is deprecated. Please use bind.listeners instead, which provides a more flexible configuration for multiple ports");
326337

338+
/**
339+
* @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE}
340+
* instead.
341+
*/
342+
@Deprecated
327343
public static final ConfigOption<Integer> COORDINATOR_IO_POOL_SIZE =
328344
key("coordinator.io-pool.size")
329345
.intType()
@@ -332,7 +348,8 @@ public class ConfigOptions {
332348
"The size of the IO thread pool to run blocking operations for coordinator server. "
333349
+ "This includes discard unnecessary snapshot files. "
334350
+ "Increase this value if you experience slow unnecessary snapshot files clean. "
335-
+ "The default value is 10.");
351+
+ "The default value is 10. "
352+
+ "This option is deprecated. Please use server.io-pool.size instead.");
336353

337354
// ------------------------------------------------------------------------
338355
// ConfigOptions for Tablet Server
@@ -394,14 +411,6 @@ public class ConfigOptions {
394411
"The rack for the tabletServer. This will be used in rack aware bucket assignment "
395412
+ "for fault tolerance. Examples: `RACK1`, `cn-hangzhou-server10`");
396413

397-
public static final ConfigOption<Integer> TABLET_SERVER_IO_POOL_SIZE =
398-
key("tablet-server.io-pool.size")
399-
.intType()
400-
.defaultValue(3)
401-
.withDescription(
402-
"The size of the IO thread pool to run blocking operations for tablet server. "
403-
+ "The default value is 3.");
404-
405414
public static final ConfigOption<String> DATA_DIR =
406415
key("data.dir")
407416
.stringType()
@@ -777,13 +786,19 @@ public class ConfigOptions {
777786
"Size of the thread pool used in scheduling tasks to copy segments, "
778787
+ "fetch remote log indexes and clean up remote log segments.");
779788

789+
/**
790+
* @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE}
791+
* instead.
792+
*/
793+
@Deprecated
780794
public static final ConfigOption<Integer> REMOTE_LOG_DATA_TRANSFER_THREAD_NUM =
781795
key("remote.log.data-transfer-thread-num")
782796
.intType()
783797
.defaultValue(4)
784798
.withDescription(
785799
"The number of threads the server uses to transfer (download and upload) "
786-
+ "remote log file can be data file, index file and remote log metadata file.");
800+
+ "remote log file can be data file, index file and remote log metadata file. "
801+
+ "This option is deprecated. Please use server.io-pool.size instead.");
787802

788803
// ------------------------------------------------------------------------
789804
// Netty Settings
@@ -1438,12 +1453,18 @@ public class ConfigOptions {
14381453
.withDescription(
14391454
"The number of threads that the server uses to schedule snapshot kv data for all the replicas in the server.");
14401455

1456+
/**
1457+
* @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE}
1458+
* instead.
1459+
*/
1460+
@Deprecated
14411461
public static final ConfigOption<Integer> KV_SNAPSHOT_TRANSFER_THREAD_NUM =
14421462
key("kv.snapshot.transfer-thread-num")
14431463
.intType()
14441464
.defaultValue(4)
14451465
.withDescription(
1446-
"The number of threads the server uses to transfer (download and upload) kv snapshot files.");
1466+
"The number of threads the server uses to transfer (download and upload) kv snapshot files. "
1467+
+ "This option is deprecated. Please use server.io-pool.size instead.");
14471468

14481469
public static final ConfigOption<Integer> KV_MAX_RETAINED_SNAPSHOTS =
14491470
key("kv.snapshot.num-retained")

fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -684,15 +684,15 @@ public static FsPath remoteKvSnapshotDir(FsPath remoteKvTabletDir, long snapshot
684684
}
685685

686686
/**
687-
* Returns the remote path for storing lake snapshot metadata required by Fluss for a table.
687+
* Returns the remote path for storing lake snapshot required by Fluss for a table.
688688
*
689689
* <p>The path contract:
690690
*
691691
* <pre>
692692
* {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}
693693
* </pre>
694694
*/
695-
public static FsPath remoteLakeTableSnapshotMetadataDir(
695+
public static FsPath remoteLakeTableSnapshotDir(
696696
String remoteDataDir, TablePath tablePath, long tableId) {
697697
return new FsPath(
698698
String.format(
@@ -710,15 +710,15 @@ public static FsPath remoteLakeTableSnapshotMetadataDir(
710710
* <p>The path contract:
711711
*
712712
* <pre>
713-
* {$remoteLakeTableSnapshotMetadataDir}/manifest/{uuid}.manifest
713+
* {$remoteLakeTableSnapshotMetadataDir}/metadata/{uuid}.manifest
714714
* </pre>
715715
*/
716716
public static FsPath remoteLakeTableSnapshotManifestPath(
717717
String remoteDataDir, TablePath tablePath, long tableId) {
718718
return new FsPath(
719719
String.format(
720-
"%s/manifest/%s.manifest",
721-
remoteLakeTableSnapshotMetadataDir(remoteDataDir, tablePath, tableId),
720+
"%s/metadata/%s.manifest",
721+
remoteLakeTableSnapshotDir(remoteDataDir, tablePath, tableId),
722722
UUID.randomUUID()));
723723
}
724724

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ public void addBucketOffset(TableBucket bucket, long offset) {
5555
logEndOffsets.put(bucket, offset);
5656
}
5757

58-
public void addPartitionBucketOffset(TableBucket bucket, long offset) {
59-
logEndOffsets.put(bucket, offset);
60-
}
61-
6258
public long getLogEndOffset(TableBucket bucket) {
6359
return logEndOffsets.get(bucket);
6460
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,9 @@ public void commit(long tableId, CommittedLakeSnapshot committedLakeSnapshot)
8383
for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
8484
committedLakeSnapshot.getLogEndOffsets().entrySet()) {
8585
Tuple2<Long, Integer> partitionBucket = entry.getKey();
86-
TableBucket tableBucket;
8786
Long partitionId = partitionBucket.f0;
88-
if (partitionId == null) {
89-
tableBucket = new TableBucket(tableId, partitionBucket.f1);
90-
flussTableLakeSnapshot.addBucketOffset(tableBucket, entry.getValue());
91-
} else {
92-
tableBucket = new TableBucket(tableId, partitionId, partitionBucket.f1);
93-
flussTableLakeSnapshot.addPartitionBucketOffset(tableBucket, entry.getValue());
94-
}
87+
TableBucket tableBucket = new TableBucket(tableId, partitionId, partitionBucket.f1);
88+
flussTableLakeSnapshot.addBucketOffset(tableBucket, entry.getValue());
9589
}
9690
commit(flussTableLakeSnapshot);
9791
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,7 @@ private Committable commitWriteResults(
232232
if (writeResult.tableBucket().getPartitionId() == null) {
233233
flussTableLakeSnapshot.addBucketOffset(tableBucket, writeResult.logEndOffset());
234234
} else {
235-
flussTableLakeSnapshot.addPartitionBucketOffset(
236-
tableBucket, writeResult.logEndOffset());
235+
flussTableLakeSnapshot.addBucketOffset(tableBucket, writeResult.logEndOffset());
237236
}
238237
}
239238
flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ private static CommitLakeTableSnapshotRequest genCommitLakeTableSnapshotRequest(
691691
lakeTableOffsetForBucket.setPartitionId(tb.getPartitionId());
692692
}
693693
lakeTableOffsetForBucket.setBucketId(tb.getBucket());
694-
lakeTableOffsetForBucket.setLogEndOffset(bucketLogEndOffsets.get(bucketId));
694+
lakeTableOffsetForBucket.setLogEndOffset(bucketLogEndOffset.getValue());
695695
}
696696
return commitLakeTableSnapshotRequest;
697697
}

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,11 @@ message PbLakeTableSnapshotInfo {
462462
message PbLakeTableOffsetForBucket {
463463
optional int64 partition_id = 1;
464464
required int32 bucket_id = 2;
465+
// Deprecated: log_start_offset is no longer used. Field number 3 is reserved for protocol compatibility.
466+
// optional int64 log_start_offset = 3;
465467
optional int64 log_end_offset = 4;
468+
// Deprecated: partition_name is no longer used. Field number 5 is reserved for protocol compatibility.
469+
// optional string partition_name = 5;
466470
optional int64 max_timestamp = 6;
467471
}
468472

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,11 +1155,9 @@ private void processNotifyLakeTableOffsetEvent(NotifyLakeTableOffsetEvent event)
11551155
for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
11561156
lakeTableSnapshots.entrySet()) {
11571157
LakeTableSnapshot lakeTableSnapshot = lakeTableSnapshotEntry.getValue();
1158-
for (Map.Entry<TableBucket, Long> bucketLogEndOffsetEntry :
1159-
lakeTableSnapshot.getBucketLogEndOffset().entrySet()) {
1160-
TableBucket tb = bucketLogEndOffsetEntry.getKey();
1158+
for (TableBucket tb : lakeTableSnapshot.getBucketLogEndOffset().keySet()) {
11611159
coordinatorContext
1162-
.getBucketLeaderAndIsr(bucketLogEndOffsetEntry.getKey())
1160+
.getBucketLeaderAndIsr(tb)
11631161
.ifPresent(
11641162
leaderAndIsr ->
11651163
coordinatorRequestBatch

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,10 @@ protected void startServices() throws Exception {
188188

189189
MetadataManager metadataManager =
190190
new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader);
191-
192-
int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
193191
this.ioExecutor =
194192
Executors.newFixedThreadPool(
195-
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
193+
conf.get(ConfigOptions.SERVER_IO_POOL_SIZE),
194+
new ExecutorThreadFactory("coordinator-io"));
196195
this.coordinatorService =
197196
new CoordinatorService(
198197
conf,
@@ -399,9 +398,13 @@ CompletableFuture<Void> stopServices() {
399398
exception = ExceptionUtils.firstOrSuppressed(t, exception);
400399
}
401400

402-
if (ioExecutor != null) {
403-
// shutdown io executor
404-
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
401+
try {
402+
if (ioExecutor != null) {
403+
// shutdown io executor
404+
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
405+
}
406+
} catch (Throwable t) {
407+
exception = ExceptionUtils.firstOrSuppressed(t, exception);
405408
}
406409

407410
try {
@@ -516,11 +519,11 @@ private static void validateConfigs(Configuration conf) {
516519
ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()));
517520
}
518521

519-
if (conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE) < 1) {
522+
if (conf.get(ConfigOptions.SERVER_IO_POOL_SIZE) < 1) {
520523
throw new IllegalConfigurationException(
521524
String.format(
522525
"Invalid configuration for %s, it must be greater than or equal 1.",
523-
ConfigOptions.COORDINATOR_IO_POOL_SIZE.key()));
526+
ConfigOptions.SERVER_IO_POOL_SIZE.key()));
524527
}
525528

526529
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/RemoteStorageCleaner.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,18 @@ public RemoteStorageCleaner(Configuration configuration, ExecutorService ioExecu
6161
}
6262
}
6363

64-
public void asyncDeleteTableRemoteDir(
65-
TablePath tablePath, boolean isKvTable, boolean isLakeEnabled, long tableId) {
64+
public void asyncDeleteTableRemoteDir(TablePath tablePath, boolean isKvTable, long tableId) {
6665
if (isKvTable) {
6766
asyncDeleteDir(FlussPaths.remoteTableDir(remoteKvDir, tablePath, tableId));
6867
}
69-
if (isLakeEnabled) {
70-
asyncDeleteDir(
71-
FlussPaths.remoteLakeTableSnapshotMetadataDir(
72-
remoteDataDir, tablePath, tableId));
73-
}
7468
asyncDeleteDir(FlussPaths.remoteTableDir(remoteLogDir, tablePath, tableId));
69+
70+
// Always delete lake snapshot metadata directory, regardless of isLakeEnabled flag.
71+
// This is because if a table was enabled datalake but turned off later, and then the table
72+
// was deleted, we may leave the lake snapshot metadata files behind if we only delete when
73+
// isLakeEnabled is true. By always deleting, we ensure cleanup of any existing metadata
74+
// files.
75+
asyncDeleteDir(FlussPaths.remoteLakeTableSnapshotDir(remoteDataDir, tablePath, tableId));
7576
}
7677

7778
public void asyncDeletePartitionRemoteDir(

0 commit comments

Comments
 (0)