From c8a1179c3fc1bc76c7e7db5e5eaead94613b9a8a Mon Sep 17 00:00:00 2001 From: ZetoHkr <147681181+shawngao-org@users.noreply.github.com> Date: Mon, 7 Apr 2025 20:26:48 +0800 Subject: [PATCH 1/4] docs: Update IDE launch configuration in contribution guide - Adjust the memory settings in VM Options and change `-Xmx1` to `-Xmx1G` Signed-off-by: ZetoHkr <147681181+shawngao-org@users.noreply.github.com> --- CONTRIBUTING_GUIDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING_GUIDE.md b/CONTRIBUTING_GUIDE.md index 0988e5a57f..6dad589eb8 100644 --- a/CONTRIBUTING_GUIDE.md +++ b/CONTRIBUTING_GUIDE.md @@ -99,7 +99,7 @@ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properti |------------------------|------------| | Main | core/src/main/scala/kafka/Kafka.scala | | ClassPath | -cp kafka.core.main | -| VM Options | -Xmx1 -Xms1G -server -XX:+UseZGC -XX:MaxDirectMemorySize=2G -Dkafka.logs.dir=logs/ -Dlog4j.configuration=file:config/log4j.properties -Dio.netty.leakDetection.level=paranoid | +| VM Options | -Xmx1G -Xms1G -server -XX:+UseZGC -XX:MaxDirectMemorySize=2G -Dkafka.logs.dir=logs/ -Dlog4j.configuration=file:config/log4j.properties -Dio.netty.leakDetection.level=paranoid | | CLI Arguments | config/kraft/server.properties| | Environment | KAFKA_S3_ACCESS_KEY=test;KAFKA_S3_SECRET_KEY=test | From e99086b230d9fbb306d6b14e9ca1563d0bdfb4b0 Mon Sep 17 00:00:00 2001 From: ZetoHkr Date: Fri, 18 Apr 2025 13:33:15 +0800 Subject: [PATCH 2/4] refactor(thread): optimize thread pool closing logic - Introduce the ThreadUtils class to uniformly handle thread pool shutdown #2358 --- .../src/main/java/com/automq/stream/s3/S3Storage.java | 10 +--------- .../automq/stream/s3/compact/CompactionManager.java | 11 +---------- .../stream/s3/operator/AbstractObjectStorage.java | 8 ++++---- .../automq/stream/s3/wal/benchmark/WriteBench.java | 9 +-------- .../s3/wal/impl/block/SlidingWindowService.java | 4 ++-- .../stream/s3/operator/TrafficRateLimiterTest.java | 10 +++------- .../stream/s3/wal/util/WALBlockDeviceChannelTest.java | 1 - 7 files changed, 12 insertions(+), 41 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index c3f7dea83d..036340083e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -414,15 +414,7 @@ public void shutdown() { request.cf.completeExceptionally(new IOException("S3Storage is shutdown")); } deltaWAL.shutdownGracefully(); - backgroundExecutor.shutdown(); - try { - if (!backgroundExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - LOGGER.warn("await backgroundExecutor timeout 10s"); - } - } catch (InterruptedException e) { - backgroundExecutor.shutdownNow(); - LOGGER.warn("await backgroundExecutor close fail", e); - } + ThreadUtils.shutdownExecutor(backgroundExecutor, 10, TimeUnit.SECONDS, LOGGER); } @Override diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 0425ce2729..71c012f110 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -207,16 +207,7 @@ public void shutdown() { } private void shutdownAndAwaitTermination(ExecutorService executor, int timeout, TimeUnit timeUnit) { - executor.shutdown(); - try { - if (!executor.awaitTermination(timeout, timeUnit)) { - executor.shutdownNow(); - } - } catch (InterruptedException ex) { - executor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } + ThreadUtils.shutdownExecutor(executor, timeout, timeUnit, logger); } public CompletableFuture compact() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java b/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java index 1aa1e97c60..d33286d268 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java @@ -662,10 +662,10 @@ public short bucketId() { @Override public void close() { - writeLimiterCallbackExecutor.shutdown(); - readCallbackExecutor.shutdown(); - writeCallbackExecutor.shutdown(); - scheduler.shutdown(); + ThreadUtils.shutdownExecutor(writeLimiterCallbackExecutor, 1, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutor(readCallbackExecutor, 1, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutor(writeCallbackExecutor, 1, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutor(scheduler, 1, TimeUnit.SECONDS); fastRetryTimer.stop(); doClose(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java index 931a074c4e..af41e6e828 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java @@ -116,14 +116,7 @@ private void run(Config config) { } Runnable stopLog = logIt(config, stat); - executor.shutdown(); - try { - if (!executor.awaitTermination(config.durationSeconds + 10, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); - } + ThreadUtils.shutdownExecutor(executor, config.durationSeconds + 10, TimeUnit.SECONDS); stopLog.run(); stopTrim.run(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java index b6c9ca483d..fef176832a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java @@ -201,8 +201,8 @@ public boolean shutdown(long timeout, TimeUnit unit) { } boolean gracefulShutdown; - this.ioExecutor.shutdown(); - this.pollBlockScheduler.shutdownNow(); + ThreadUtils.shutdownExecutor(this.ioExecutor, timeout, unit); + ThreadUtils.shutdownExecutor(this.pollBlockScheduler, 1, TimeUnit.SECONDS); List tasks = new LinkedList<>(); try { gracefulShutdown = this.ioExecutor.awaitTermination(timeout, unit); diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java index 1788efc945..9ed70f646d 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java @@ -10,17 +10,13 @@ */ package com.automq.stream.s3.operator; +import com.automq.stream.utils.ThreadUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,7 +72,7 @@ public void testConsumeBeforeUpdate() { } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } finally { - executor.shutdown(); + ThreadUtils.shutdownExecutor(executor, 1, TimeUnit.SECONDS); } } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java index 4c2bdfc284..36b365e02f 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java @@ -112,7 +112,6 @@ public void testMultiThreadWrite() throws IOException, InterruptedException { } executor.shutdown(); assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); - channel.close(); } From b68edb7fb7d4953239a3029549acb3697694d293 Mon Sep 17 00:00:00 2001 From: ZetoHkr Date: Fri, 18 Apr 2025 13:42:17 +0800 Subject: [PATCH 3/4] perf(s3): Optimize TrafficRateLimiter test code import --- .../automq/stream/s3/operator/TrafficRateLimiterTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java index 9ed70f646d..83b0dba8e6 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java @@ -16,7 +16,13 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; From 451ba78e6ed4e49de487224900e90f1de14c72da Mon Sep 17 00:00:00 2001 From: ZetoHkr Date: Fri, 18 Apr 2025 13:53:56 +0800 Subject: [PATCH 4/4] perf(s3): Optimize TrafficRateLimiter test code import --- .../com/automq/stream/s3/operator/TrafficRateLimiterTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java index 83b0dba8e6..87657329c7 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java @@ -11,6 +11,7 @@ package com.automq.stream.s3.operator; import com.automq.stream.utils.ThreadUtils; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag;