diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index c3f7dea83d..036340083e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -414,15 +414,7 @@ public void shutdown() { request.cf.completeExceptionally(new IOException("S3Storage is shutdown")); } deltaWAL.shutdownGracefully(); - backgroundExecutor.shutdown(); - try { - if (!backgroundExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - LOGGER.warn("await backgroundExecutor timeout 10s"); - } - } catch (InterruptedException e) { - backgroundExecutor.shutdownNow(); - LOGGER.warn("await backgroundExecutor close fail", e); - } + ThreadUtils.shutdownExecutor(backgroundExecutor, 10, TimeUnit.SECONDS, LOGGER); } @Override diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index 0425ce2729..71c012f110 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -207,16 +207,7 @@ public void shutdown() { } private void shutdownAndAwaitTermination(ExecutorService executor, int timeout, TimeUnit timeUnit) { - executor.shutdown(); - try { - if (!executor.awaitTermination(timeout, timeUnit)) { - executor.shutdownNow(); - } - } catch (InterruptedException ex) { - executor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } + ThreadUtils.shutdownExecutor(executor, timeout, timeUnit, logger); } public CompletableFuture compact() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java b/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java index 1aa1e97c60..d33286d268 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java @@ -662,10 +662,10 @@ public short bucketId() { @Override public void close() { - writeLimiterCallbackExecutor.shutdown(); - readCallbackExecutor.shutdown(); - writeCallbackExecutor.shutdown(); - scheduler.shutdown(); + ThreadUtils.shutdownExecutor(writeLimiterCallbackExecutor, 1, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutor(readCallbackExecutor, 1, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutor(writeCallbackExecutor, 1, TimeUnit.SECONDS); + ThreadUtils.shutdownExecutor(scheduler, 1, TimeUnit.SECONDS); fastRetryTimer.stop(); doClose(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java index 931a074c4e..af41e6e828 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java @@ -116,14 +116,7 @@ private void run(Config config) { } Runnable stopLog = logIt(config, stat); - executor.shutdown(); - try { - if (!executor.awaitTermination(config.durationSeconds + 10, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); - } + ThreadUtils.shutdownExecutor(executor, config.durationSeconds + 10, TimeUnit.SECONDS); stopLog.run(); stopTrim.run(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java index b6c9ca483d..fef176832a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/SlidingWindowService.java @@ -201,8 +201,8 @@ public boolean shutdown(long timeout, TimeUnit unit) { } boolean gracefulShutdown; - this.ioExecutor.shutdown(); - this.pollBlockScheduler.shutdownNow(); + ThreadUtils.shutdownExecutor(this.ioExecutor, timeout, unit); + ThreadUtils.shutdownExecutor(this.pollBlockScheduler, 1, TimeUnit.SECONDS); List tasks = new LinkedList<>(); try { gracefulShutdown = this.ioExecutor.awaitTermination(timeout, unit); diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java index 1788efc945..87657329c7 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/TrafficRateLimiterTest.java @@ -10,6 +10,8 @@ */ package com.automq.stream.s3.operator; +import com.automq.stream.utils.ThreadUtils; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -21,6 +23,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,7 +79,7 @@ public void testConsumeBeforeUpdate() { } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } finally { - executor.shutdown(); + ThreadUtils.shutdownExecutor(executor, 1, TimeUnit.SECONDS); } } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java index 4c2bdfc284..36b365e02f 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java @@ -112,7 +112,6 @@ public void testMultiThreadWrite() throws IOException, InterruptedException { } executor.shutdown(); assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); - channel.close(); }