From e5c5a247c63a0f9427f2c48097b4dac00bffd054 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Thu, 18 Jul 2024 17:30:52 +0800 Subject: [PATCH 01/10] feat(wal): wal start optimization --- .../s3/wal/impl/block/BlockWALService.java | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index 12c6d0464a..8c38d0416d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -273,31 +273,42 @@ private void parseRecordBody(long recoverStartOffset, RecordHeader readRecordHea @Override public WriteAheadLog start() throws IOException { - if (started.get()) { + boolean state = started.get(); + if (state) { LOGGER.warn("block WAL service already started"); return this; } - StopWatch stopWatch = StopWatch.createStarted(); + if (started.compareAndSet(state, true)) { + boolean success = false; + try { + StopWatch stopWatch = StopWatch.createStarted(); + + walChannel.open(channel -> Optional.ofNullable(tryReadWALHeader(walChannel)) + .map(BlockWALHeader::getCapacity) + .orElse(null)); + + BlockWALHeader header = tryReadWALHeader(walChannel); + if (null == header) { + assert !recoveryMode; + header = newWALHeader(); + firstStart = true; + LOGGER.info("no available WALHeader, create a new one: {}", header); + } else { + LOGGER.info("read WALHeader from WAL: {}", header); + } - walChannel.open(channel -> Optional.ofNullable(tryReadWALHeader(walChannel)) - .map(BlockWALHeader::getCapacity) - .orElse(null)); + header.setShutdownType(ShutdownType.UNGRACEFULLY); + walHeaderReady(header); + success = true; - BlockWALHeader header = tryReadWALHeader(walChannel); - if (null == header) { - assert !recoveryMode; - header = newWALHeader(); - firstStart = true; - LOGGER.info("no available WALHeader, create a new one: {}", header); - } else { - LOGGER.info("read WALHeader from WAL: {}", header); + LOGGER.info("block WAL service started, cost: {} ms", stopWatch.getTime(TimeUnit.MILLISECONDS)); + } finally { + if (!success) { + started.set(false); + } + } } - header.setShutdownType(ShutdownType.UNGRACEFULLY); - walHeaderReady(header); - - started.set(true); - LOGGER.info("block WAL service started, cost: {} ms", stopWatch.getTime(TimeUnit.MILLISECONDS)); return this; } From bf58d416e270be3a7195ef3c440adab03628bde6 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Tue, 23 Jul 2024 16:03:30 +0800 Subject: [PATCH 02/10] feat(wal): wal start optimization --- .../com/automq/stream/s3/wal/impl/block/BlockWALService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index 8c38d0416d..b0cd6b0ae2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -305,6 +305,7 @@ public WriteAheadLog start() throws IOException { } finally { if (!success) { started.set(false); + LOGGER.warn("block WAL service started fail"); } } } From 03232321fb77b7d61da810cbdf872f4f981d364c Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 24 Jul 2024 11:41:42 +0800 Subject: [PATCH 03/10] feat(wal): wal start optimization From b90d210da01829a5a9af890e416872049012cb27 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 24 Jul 2024 18:01:01 +0800 Subject: [PATCH 04/10] feat(wal): wal start and shutdown optimization --- .../s3/wal/impl/block/BlockWALService.java | 118 ++++++++++++------ 1 file changed, 77 insertions(+), 41 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index b0cd6b0ae2..3f824b20ef 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.function.Function; @@ -115,7 +116,11 @@ public class BlockWALService implements WriteAheadLog { public static final int WAL_HEADER_CAPACITY = WALUtil.BLOCK_SIZE; public static final int WAL_HEADER_TOTAL_CAPACITY = WAL_HEADER_CAPACITY * WAL_HEADER_COUNT; private static final Logger LOGGER = LoggerFactory.getLogger(BlockWALService.class); - private final AtomicBoolean started = new AtomicBoolean(false); + private static final int WAL_STATE_INIT = 1; + private static final int WAL_STATE_STARTED = 2; + private static final int WAL_STATE_SHUTING_DOWN = 3; + private static final int WAL_STATE_SHUTDOWN = 4; + private static final AtomicIntegerFieldUpdater WAL_STATE = AtomicIntegerFieldUpdater.newUpdater(BlockWALService.class, "state"); private final AtomicBoolean resetFinished = new AtomicBoolean(false); private final AtomicLong writeHeaderRoundTimes = new AtomicLong(0); private final ExecutorService walHeaderFlusher = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("flush-wal-header-thread-%d", true), LOGGER); @@ -132,7 +137,7 @@ public class BlockWALService implements WriteAheadLog { * It is always aligned to the {@link WALUtil#BLOCK_SIZE}. */ private long recoveryCompleteOffset = -1; - + private volatile int state = WAL_STATE_INIT; private BlockWALService() { } @@ -273,44 +278,53 @@ private void parseRecordBody(long recoverStartOffset, RecordHeader readRecordHea @Override public WriteAheadLog start() throws IOException { - boolean state = started.get(); - if (state) { - LOGGER.warn("block WAL service already started"); - return this; - } - if (started.compareAndSet(state, true)) { - boolean success = false; - try { - StopWatch stopWatch = StopWatch.createStarted(); - - walChannel.open(channel -> Optional.ofNullable(tryReadWALHeader(walChannel)) - .map(BlockWALHeader::getCapacity) - .orElse(null)); - - BlockWALHeader header = tryReadWALHeader(walChannel); - if (null == header) { - assert !recoveryMode; - header = newWALHeader(); - firstStart = true; - LOGGER.info("no available WALHeader, create a new one: {}", header); - } else { - LOGGER.info("read WALHeader from WAL: {}", header); + switch (WAL_STATE.get(this)) { + case WAL_STATE_INIT: + if (WAL_STATE.compareAndSet(this, WAL_STATE_INIT, WAL_STATE_STARTED)) { + boolean success = false; + try { + doStart(); + success = true; + } finally { + if (!success) { + WAL_STATE.compareAndSet(this, WAL_STATE_STARTED, WAL_STATE_INIT); + LOGGER.warn("block WAL service started fail"); + } + } } + break; + case WAL_STATE_STARTED: + LOGGER.warn("block WAL service already started"); + break; + case WAL_STATE_SHUTING_DOWN: + case WAL_STATE_SHUTDOWN: + throw new IllegalStateException("block WAL service already shutdown"); + default: + throw new IllegalStateException("invalid WAL state"); + } + return this; + } - header.setShutdownType(ShutdownType.UNGRACEFULLY); - walHeaderReady(header); - success = true; + public void doStart() throws IOException { + StopWatch stopWatch = StopWatch.createStarted(); - LOGGER.info("block WAL service started, cost: {} ms", stopWatch.getTime(TimeUnit.MILLISECONDS)); - } finally { - if (!success) { - started.set(false); - LOGGER.warn("block WAL service started fail"); - } - } + walChannel.open(channel -> Optional.ofNullable(tryReadWALHeader(walChannel)) + .map(BlockWALHeader::getCapacity) + .orElse(null)); + + BlockWALHeader header = tryReadWALHeader(walChannel); + if (null == header) { + assert !recoveryMode; + header = newWALHeader(); + firstStart = true; + LOGGER.info("no available WALHeader, create a new one: {}", header); + } else { + LOGGER.info("read WALHeader from WAL: {}", header); } - return this; + header.setShutdownType(ShutdownType.UNGRACEFULLY); + walHeaderReady(header); + LOGGER.info("block WAL service started, cost: {} ms", stopWatch.getTime(TimeUnit.MILLISECONDS)); } private void registerMetrics() { @@ -382,12 +396,33 @@ private void walHeaderReady(BlockWALHeader header) { @Override public void shutdownGracefully() { - StopWatch stopWatch = StopWatch.createStarted(); - - if (!started.getAndSet(false)) { - LOGGER.warn("block WAL service already shutdown or not started yet"); - return; + for (; ; ) { + int state = WAL_STATE.get(this); + if (state == WAL_STATE_SHUTDOWN) { + LOGGER.warn("block WAL service already shutdown"); + return; + } + if (state == WAL_STATE_SHUTING_DOWN + || WAL_STATE.compareAndSet(this, state, WAL_STATE_SHUTING_DOWN)) { + break; + } + } + int state = WAL_STATE.get(this); + if (WAL_STATE.compareAndSet(this, state, WAL_STATE_SHUTDOWN)) { + boolean success = false; + try { + doShutdown(); + success = true; + } finally { + if (!success) { + WAL_STATE.compareAndSet(this, state, WAL_STATE_SHUTING_DOWN); + } + } } + } + + private void doShutdown() { + StopWatch stopWatch = StopWatch.createStarted(); walHeaderFlusher.shutdown(); try { if (!walHeaderFlusher.awaitTermination(5, TimeUnit.SECONDS)) { @@ -405,6 +440,7 @@ public void shutdownGracefully() { walChannel.close(); LOGGER.info("block WAL service shutdown gracefully: {}, cost: {} ms", gracefulShutdown, stopWatch.getTime(TimeUnit.MILLISECONDS)); + } @Override @@ -534,7 +570,7 @@ private CompletableFuture trim(long offset, boolean internal) { } private void checkStarted() { - if (!started.get()) { + if (WAL_STATE.get(this) != WAL_STATE_STARTED) { throw new IllegalStateException("WriteAheadLog has not been started yet"); } } From ce25681304d2cbab6aefc538798307d687a88502 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 31 Jul 2024 13:37:04 +0800 Subject: [PATCH 05/10] feat(wal): wal start and shutdown optimization --- .../s3/wal/impl/block/BlockWALService.java | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index 3f824b20ef..a670f751c4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -45,7 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.function.Function; @@ -116,11 +116,12 @@ public class BlockWALService implements WriteAheadLog { public static final int WAL_HEADER_CAPACITY = WALUtil.BLOCK_SIZE; public static final int WAL_HEADER_TOTAL_CAPACITY = WAL_HEADER_CAPACITY * WAL_HEADER_COUNT; private static final Logger LOGGER = LoggerFactory.getLogger(BlockWALService.class); - private static final int WAL_STATE_INIT = 1; - private static final int WAL_STATE_STARTED = 2; - private static final int WAL_STATE_SHUTING_DOWN = 3; - private static final int WAL_STATE_SHUTDOWN = 4; - private static final AtomicIntegerFieldUpdater WAL_STATE = AtomicIntegerFieldUpdater.newUpdater(BlockWALService.class, "state"); + private static final int INIT = 1; + private static final int STARTING = 2; + private static final int STARTED = 3; + private static final int SHUTTING_DOWN = 4; + private static final int SHUTDOWN = 5; + private static final AtomicInteger WAL_STATE = new AtomicInteger(INIT); private final AtomicBoolean resetFinished = new AtomicBoolean(false); private final AtomicLong writeHeaderRoundTimes = new AtomicLong(0); private final ExecutorService walHeaderFlusher = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("flush-wal-header-thread-%d", true), LOGGER); @@ -137,7 +138,6 @@ public class BlockWALService implements WriteAheadLog { * It is always aligned to the {@link WALUtil#BLOCK_SIZE}. */ private long recoveryCompleteOffset = -1; - private volatile int state = WAL_STATE_INIT; private BlockWALService() { } @@ -278,26 +278,28 @@ private void parseRecordBody(long recoverStartOffset, RecordHeader readRecordHea @Override public WriteAheadLog start() throws IOException { - switch (WAL_STATE.get(this)) { - case WAL_STATE_INIT: - if (WAL_STATE.compareAndSet(this, WAL_STATE_INIT, WAL_STATE_STARTED)) { - boolean success = false; + switch (WAL_STATE.get()) { + case INIT: + if (WAL_STATE.compareAndSet(INIT, STARTING)) { try { doStart(); - success = true; + WAL_STATE.set(STARTED); } finally { - if (!success) { - WAL_STATE.compareAndSet(this, WAL_STATE_STARTED, WAL_STATE_INIT); + if (WAL_STATE.get() != STARTED) { + WAL_STATE.compareAndSet(STARTING, INIT); LOGGER.warn("block WAL service started fail"); } } } break; - case WAL_STATE_STARTED: + case STARTING: + LOGGER.warn("block WAL service is starting"); + break; + case STARTED: LOGGER.warn("block WAL service already started"); break; - case WAL_STATE_SHUTING_DOWN: - case WAL_STATE_SHUTDOWN: + case SHUTTING_DOWN: + case SHUTDOWN: throw new IllegalStateException("block WAL service already shutdown"); default: throw new IllegalStateException("invalid WAL state"); @@ -397,25 +399,30 @@ private void walHeaderReady(BlockWALHeader header) { @Override public void shutdownGracefully() { for (; ; ) { - int state = WAL_STATE.get(this); - if (state == WAL_STATE_SHUTDOWN) { - LOGGER.warn("block WAL service already shutdown"); + int state = WAL_STATE.get(); + if (state == SHUTDOWN || WAL_STATE.compareAndSet(INIT, SHUTDOWN)) { + LOGGER.warn("block WAL service already shutdown or not started yet"); return; } - if (state == WAL_STATE_SHUTING_DOWN - || WAL_STATE.compareAndSet(this, state, WAL_STATE_SHUTING_DOWN)) { + if (state == STARTING) { + Thread.yield(); + continue; + } + if (state == SHUTTING_DOWN + || WAL_STATE.compareAndSet(state, SHUTTING_DOWN)) { break; } } - int state = WAL_STATE.get(this); - if (WAL_STATE.compareAndSet(this, state, WAL_STATE_SHUTDOWN)) { + + if (WAL_STATE.compareAndSet(SHUTTING_DOWN, SHUTDOWN)) { boolean success = false; try { doShutdown(); success = true; } finally { if (!success) { - WAL_STATE.compareAndSet(this, state, WAL_STATE_SHUTING_DOWN); + LOGGER.warn("block WAL service shutdown fail"); + WAL_STATE.compareAndSet(SHUTDOWN, SHUTTING_DOWN); } } } @@ -570,7 +577,7 @@ private CompletableFuture trim(long offset, boolean internal) { } private void checkStarted() { - if (WAL_STATE.get(this) != WAL_STATE_STARTED) { + if (WAL_STATE.get() != STARTED) { throw new IllegalStateException("WriteAheadLog has not been started yet"); } } @@ -820,7 +827,6 @@ public long getJumpNextRecoverOffset() { return jumpNextRecoverOffset; } } - /** * Protected for testing purpose. */ From 84c4424c9797a7f88ad7c16d3149f7a81c7cf011 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 31 Jul 2024 13:45:42 +0800 Subject: [PATCH 06/10] feat(wal): wal start and shutdown optimization --- .../com/automq/stream/s3/wal/impl/block/BlockWALService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index a670f751c4..271eb155ac 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -121,7 +121,8 @@ public class BlockWALService implements WriteAheadLog { private static final int STARTED = 3; private static final int SHUTTING_DOWN = 4; private static final int SHUTDOWN = 5; - private static final AtomicInteger WAL_STATE = new AtomicInteger(INIT); + @SuppressWarnings("checkstyle:MemberName") + private final AtomicInteger WAL_STATE = new AtomicInteger(INIT); private final AtomicBoolean resetFinished = new AtomicBoolean(false); private final AtomicLong writeHeaderRoundTimes = new AtomicLong(0); private final ExecutorService walHeaderFlusher = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("flush-wal-header-thread-%d", true), LOGGER); From 6e50d72b3bb0dafc250b2c7c80858e7f1bbc115f Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 31 Jul 2024 13:47:10 +0800 Subject: [PATCH 07/10] feat(wal): wal start and shutdown optimization --- .../com/automq/stream/s3/wal/impl/block/BlockWALService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index 271eb155ac..4ad8a7ef8d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -828,6 +828,7 @@ public long getJumpNextRecoverOffset() { return jumpNextRecoverOffset; } } + /** * Protected for testing purpose. */ From 62982cf1a62e477d6388277ef38de734da770a78 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 31 Jul 2024 14:55:31 +0800 Subject: [PATCH 08/10] feat(wal): wal start and shutdown optimization --- .../test/java/com/automq/stream/s3/failover/FailoverTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/s3stream/src/test/java/com/automq/stream/s3/failover/FailoverTest.java b/s3stream/src/test/java/com/automq/stream/s3/failover/FailoverTest.java index 14b4db1e3b..f73d7c652a 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/failover/FailoverTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/failover/FailoverTest.java @@ -61,7 +61,8 @@ public void test() throws IOException, ExecutionException, InterruptedException, request.setDevice(path); request.setVolumeId("test_volume_id"); - when(failoverFactory.getWal(any())).thenReturn(BlockWALService.builder(path, 1024 * 1024).nodeId(233).epoch(100).build()); + when(failoverFactory.getWal(any())).thenAnswer(s -> + BlockWALService.builder(path, 1024 * 1024).nodeId(233).epoch(100).build()); boolean exceptionThrown = false; try { From d07a683a93fb37ac6946d7a40c252b5b7f953671 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Tue, 20 Aug 2024 10:30:39 +0800 Subject: [PATCH 09/10] feat(s3stream): wal start and shutdown optimization --- .../s3/wal/impl/block/BlockWALService.java | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index f764af760f..c8c474d287 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -117,13 +117,8 @@ public class BlockWALService implements WriteAheadLog { public static final int WAL_HEADER_CAPACITY = WALUtil.BLOCK_SIZE; public static final int WAL_HEADER_TOTAL_CAPACITY = WAL_HEADER_CAPACITY * WAL_HEADER_COUNT; private static final Logger LOGGER = LoggerFactory.getLogger(BlockWALService.class); - private static final int INIT = 1; - private static final int STARTING = 2; - private static final int STARTED = 3; - private static final int SHUTTING_DOWN = 4; - private static final int SHUTDOWN = 5; @SuppressWarnings("checkstyle:MemberName") - private final AtomicInteger WAL_STATE = new AtomicInteger(INIT); + private final AtomicInteger WAL_STATE = new AtomicInteger(WalState.INIT); private final AtomicBoolean resetFinished = new AtomicBoolean(false); private final AtomicLong writeHeaderRoundTimes = new AtomicLong(0); private final ExecutorService walHeaderFlusher = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("flush-wal-header-thread-%d", true), LOGGER); @@ -284,27 +279,27 @@ private void parseRecordBody(long recoverStartOffset, RecordHeader readRecordHea @Override public WriteAheadLog start() throws IOException { switch (WAL_STATE.get()) { - case INIT: - if (WAL_STATE.compareAndSet(INIT, STARTING)) { + case WalState.INIT: + if (WAL_STATE.compareAndSet(WalState.INIT, WalState.STARTING)) { try { doStart(); - WAL_STATE.set(STARTED); + WAL_STATE.set(WalState.STARTED); } finally { - if (WAL_STATE.get() != STARTED) { - WAL_STATE.compareAndSet(STARTING, INIT); + if (WAL_STATE.get() != WalState.STARTED) { + WAL_STATE.compareAndSet(WalState.STARTING, WalState.INIT); LOGGER.warn("block WAL service started fail"); } } } break; - case STARTING: + case WalState.STARTING: LOGGER.warn("block WAL service is starting"); break; - case STARTED: + case WalState.STARTED: LOGGER.warn("block WAL service already started"); break; - case SHUTTING_DOWN: - case SHUTDOWN: + case WalState.SHUTTING_DOWN: + case WalState.SHUTDOWN: throw new IllegalStateException("block WAL service already shutdown"); default: throw new IllegalStateException("invalid WAL state"); @@ -405,21 +400,21 @@ private void walHeaderReady(BlockWALHeader header) { public void shutdownGracefully() { for (; ; ) { int state = WAL_STATE.get(); - if (state == SHUTDOWN || WAL_STATE.compareAndSet(INIT, SHUTDOWN)) { + if (state == WalState.SHUTDOWN || WAL_STATE.compareAndSet(WalState.INIT, WalState.SHUTDOWN)) { LOGGER.warn("block WAL service already shutdown or not started yet"); return; } - if (state == STARTING) { + if (state == WalState.STARTING) { Thread.yield(); continue; } - if (state == SHUTTING_DOWN - || WAL_STATE.compareAndSet(state, SHUTTING_DOWN)) { + if (state == WalState.SHUTTING_DOWN + || WAL_STATE.compareAndSet(state, WalState.SHUTTING_DOWN)) { break; } } - if (WAL_STATE.compareAndSet(SHUTTING_DOWN, SHUTDOWN)) { + if (WAL_STATE.compareAndSet(WalState.SHUTTING_DOWN, WalState.SHUTDOWN)) { boolean success = false; try { doShutdown(); @@ -427,7 +422,7 @@ public void shutdownGracefully() { } finally { if (!success) { LOGGER.warn("block WAL service shutdown fail"); - WAL_STATE.compareAndSet(SHUTDOWN, SHUTTING_DOWN); + WAL_STATE.compareAndSet(WalState.SHUTDOWN, WalState.SHUTTING_DOWN); } } } @@ -565,7 +560,7 @@ private CompletableFuture trim(long offset, boolean internal) { } private void checkStarted() { - if (WAL_STATE.get() != STARTED) { + if (WAL_STATE.get() != WalState.STARTED) { throw new IllegalStateException("WriteAheadLog has not been started yet"); } } @@ -586,6 +581,14 @@ private SlidingWindowService.WALHeaderFlusher flusher() { return () -> flushWALHeader(ShutdownType.UNGRACEFULLY); } + private static class WalState { + public static final int INIT = 1; + public static final int STARTING = 2; + public static final int STARTED = 3; + public static final int SHUTTING_DOWN = 4; + public static final int SHUTDOWN = 5; + } + public static class BlockWALServiceBuilder { private final String blockDevicePath; private long blockDeviceCapacityWant = CAPACITY_NOT_SET; From c8101dc97e7d8f3d19704f206c6a5eef6ebad670 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Wed, 21 Aug 2024 11:51:57 +0800 Subject: [PATCH 10/10] refactor(s3stream/wal): use an enum class to identify the WAL state Signed-off-by: Ning Yu --- .../s3/wal/impl/block/BlockWALService.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java index c8c474d287..06ca1f8458 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/block/BlockWALService.java @@ -45,8 +45,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.function.Function; import org.apache.commons.lang3.StringUtils; @@ -118,7 +118,7 @@ public class BlockWALService implements WriteAheadLog { public static final int WAL_HEADER_TOTAL_CAPACITY = WAL_HEADER_CAPACITY * WAL_HEADER_COUNT; private static final Logger LOGGER = LoggerFactory.getLogger(BlockWALService.class); @SuppressWarnings("checkstyle:MemberName") - private final AtomicInteger WAL_STATE = new AtomicInteger(WalState.INIT); + private final AtomicReference state = new AtomicReference<>(WalState.INIT); private final AtomicBoolean resetFinished = new AtomicBoolean(false); private final AtomicLong writeHeaderRoundTimes = new AtomicLong(0); private final ExecutorService walHeaderFlusher = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("flush-wal-header-thread-%d", true), LOGGER); @@ -278,28 +278,28 @@ private void parseRecordBody(long recoverStartOffset, RecordHeader readRecordHea @Override public WriteAheadLog start() throws IOException { - switch (WAL_STATE.get()) { - case WalState.INIT: - if (WAL_STATE.compareAndSet(WalState.INIT, WalState.STARTING)) { + switch (state.get()) { + case INIT: + if (state.compareAndSet(WalState.INIT, WalState.STARTING)) { try { doStart(); - WAL_STATE.set(WalState.STARTED); + state.set(WalState.STARTED); } finally { - if (WAL_STATE.get() != WalState.STARTED) { - WAL_STATE.compareAndSet(WalState.STARTING, WalState.INIT); + if (state.get() != WalState.STARTED) { + state.compareAndSet(WalState.STARTING, WalState.INIT); LOGGER.warn("block WAL service started fail"); } } } break; - case WalState.STARTING: + case STARTING: LOGGER.warn("block WAL service is starting"); break; - case WalState.STARTED: + case STARTED: LOGGER.warn("block WAL service already started"); break; - case WalState.SHUTTING_DOWN: - case WalState.SHUTDOWN: + case SHUTTING_DOWN: + case SHUTDOWN: throw new IllegalStateException("block WAL service already shutdown"); default: throw new IllegalStateException("invalid WAL state"); @@ -399,8 +399,8 @@ private void walHeaderReady(BlockWALHeader header) { @Override public void shutdownGracefully() { for (; ; ) { - int state = WAL_STATE.get(); - if (state == WalState.SHUTDOWN || WAL_STATE.compareAndSet(WalState.INIT, WalState.SHUTDOWN)) { + WalState state = this.state.get(); + if (state == WalState.SHUTDOWN || this.state.compareAndSet(WalState.INIT, WalState.SHUTDOWN)) { LOGGER.warn("block WAL service already shutdown or not started yet"); return; } @@ -409,12 +409,12 @@ public void shutdownGracefully() { continue; } if (state == WalState.SHUTTING_DOWN - || WAL_STATE.compareAndSet(state, WalState.SHUTTING_DOWN)) { + || this.state.compareAndSet(state, WalState.SHUTTING_DOWN)) { break; } } - if (WAL_STATE.compareAndSet(WalState.SHUTTING_DOWN, WalState.SHUTDOWN)) { + if (state.compareAndSet(WalState.SHUTTING_DOWN, WalState.SHUTDOWN)) { boolean success = false; try { doShutdown(); @@ -422,7 +422,7 @@ public void shutdownGracefully() { } finally { if (!success) { LOGGER.warn("block WAL service shutdown fail"); - WAL_STATE.compareAndSet(WalState.SHUTDOWN, WalState.SHUTTING_DOWN); + state.compareAndSet(WalState.SHUTDOWN, WalState.SHUTTING_DOWN); } } } @@ -560,7 +560,7 @@ private CompletableFuture trim(long offset, boolean internal) { } private void checkStarted() { - if (WAL_STATE.get() != WalState.STARTED) { + if (state.get() != WalState.STARTED) { throw new IllegalStateException("WriteAheadLog has not been started yet"); } } @@ -581,12 +581,12 @@ private SlidingWindowService.WALHeaderFlusher flusher() { return () -> flushWALHeader(ShutdownType.UNGRACEFULLY); } - private static class WalState { - public static final int INIT = 1; - public static final int STARTING = 2; - public static final int STARTED = 3; - public static final int SHUTTING_DOWN = 4; - public static final int SHUTDOWN = 5; + private enum WalState { + INIT, + STARTING, + STARTED, + SHUTTING_DOWN, + SHUTDOWN, } public static class BlockWALServiceBuilder {