diff --git a/client/java-armeria/src/test/java/com/linecorp/centraldogma/client/armeria/WatcherTest.java b/client/java-armeria/src/test/java/com/linecorp/centraldogma/client/armeria/WatcherTest.java index 36dc43c4c..ff72c8076 100644 --- a/client/java-armeria/src/test/java/com/linecorp/centraldogma/client/armeria/WatcherTest.java +++ b/client/java-armeria/src/test/java/com/linecorp/centraldogma/client/armeria/WatcherTest.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; @@ -98,7 +99,7 @@ void multipleNewChild() throws Exception { .watcher(Query.ofText("/baz.txt")) .start(); final Watcher watcher = originalWatcher - .newChild(txt -> 1) + .newChildAsync(txt -> CompletableFuture.completedFuture(1)) .newChild(intValue -> Integer.toString(intValue)) .newChild("1"::equals); @@ -120,4 +121,35 @@ void mapperException() { .hasCauseExactlyInstanceOf(RuntimeException.class)); originalWatcher.close(); } + + @Test + void mapperAsyncFailure() { + final Watcher originalWatcher = dogma.client() + .forRepo("foo", "bar") + .watcher(Query.ofText("/baz.txt")) + .start(); + originalWatcher.initialValueFuture().join(); + final Watcher watcher = originalWatcher.newChildAsync(unused -> + CompletableFuture.failedFuture(new RuntimeException())) + .newChild(val -> "not called"); + await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join()) + .hasCauseExactlyInstanceOf(RuntimeException.class)); + originalWatcher.close(); + } + + @Test + void mapperAsyncException() { + final Watcher originalWatcher = dogma.client() + .forRepo("foo", "bar") + .watcher(Query.ofText("/baz.txt")) + .start(); + originalWatcher.initialValueFuture().join(); + final Watcher watcher = originalWatcher.newChildAsync(unused -> { + throw new RuntimeException(); + }) + .newChild(val -> "not called"); + await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join()) + .hasCauseExactlyInstanceOf(RuntimeException.class)); + originalWatcher.close(); + } } diff --git a/client/java/src/main/java/com/linecorp/centraldogma/client/MappingWatcher.java b/client/java/src/main/java/com/linecorp/centraldogma/client/MappingWatcher.java index ecda6a517..11b4e29ac 100644 --- a/client/java/src/main/java/com/linecorp/centraldogma/client/MappingWatcher.java +++ b/client/java/src/main/java/com/linecorp/centraldogma/client/MappingWatcher.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 LINE Corporation + * Copyright 2026 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -20,12 +20,12 @@ import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; @@ -38,83 +38,102 @@ import com.linecorp.centraldogma.common.Revision; final class MappingWatcher implements Watcher { - private static final Logger logger = LoggerFactory.getLogger(MappingWatcher.class); - static MappingWatcher of(Watcher parent, Function mapper, - Executor executor, boolean closeParentWhenClosing) { + static MappingWatcher of(Watcher parent, + Function> + mapper) { requireNonNull(parent, "parent"); requireNonNull(mapper, "mapper"); - requireNonNull(executor, "executor"); - // TODO(minwoo): extract mapper function and combine it with the new mapper. - return new MappingWatcher<>(parent, mapper, executor, closeParentWhenClosing); + return new MappingWatcher<>(parent, mapper); } - private final Watcher parent; - private final Function mapper; - private final Executor mapperExecutor; - private final boolean closeParentWhenClosing; - private final CompletableFuture> initialValueFuture = new CompletableFuture<>(); - private final List, Executor>> updateListeners = - new CopyOnWriteArrayList<>(); + private static boolean isUpdate(Latest newLatest, @Nullable Latest existing) { + if (existing == null) { + return true; + } + if (Objects.equals(existing.value(), newLatest.value())) { + return false; + } + return newLatest.revision().compareTo(existing.revision()) >= 0; + } - @Nullable - private volatile Latest mappedLatest; + private final CompletableFuture> initialValueFuture = new CompletableFuture<>(); private volatile boolean closed; + private final Watcher parent; - MappingWatcher(Watcher parent, Function mapper, Executor mapperExecutor, - boolean closeParentWhenClosing) { + private final List, Executor>> updateListeners = + new CopyOnWriteArrayList<>(); + private final Function> mapper; + private final AtomicReference<@Nullable Latest> mappedLatest = new AtomicReference<>(); + + MappingWatcher(Watcher parent, Function> + mapper) { this.parent = parent; this.mapper = mapper; - this.mapperExecutor = mapperExecutor; - this.closeParentWhenClosing = closeParentWhenClosing; parent.initialValueFuture().exceptionally(cause -> { initialValueFuture.completeExceptionally(cause); return null; }); + final BiConsumer reportFailure = (e, r) -> { + logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}, revision {}", mapper, + r, e); + if (!initialValueFuture.isDone()) { + initialValueFuture.completeExceptionally(e); + } + close(); + }; parent.watch((revision, value) -> { if (closed) { return; } - final U mappedValue; + final CompletableFuture mappedValueFuture; try { - mappedValue = mapper.apply(value); + mappedValueFuture = mapper.apply(value); } catch (Exception e) { - logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}", mapper, e); - if (!initialValueFuture.isDone()) { - initialValueFuture.completeExceptionally(e); - } - close(); - return; - } - final Latest oldLatest = mappedLatest; - if (oldLatest != null && Objects.equals(oldLatest.value(), mappedValue)) { + reportFailure.accept(e, revision); return; } + mappedValueFuture.whenComplete((mappedValue, e) -> { + if (closed) { + return; + } + if (null != e) { + reportFailure.accept(e, revision); + return; + } - // mappedValue can be nullable which is fine. - final Latest newLatest = new Latest<>(revision, mappedValue); - mappedLatest = newLatest; - notifyListeners(newLatest); - if (!initialValueFuture.isDone()) { - initialValueFuture.complete(newLatest); - } - }, mapperExecutor); + // mappedValue can be nullable which is fine. + final Latest newLatest = new Latest<>(revision, mappedValue); + final Latest oldLatest = mappedLatest.getAndUpdate( + existing -> isUpdate(newLatest, existing) ? newLatest : existing + ); + if (!isUpdate(newLatest, oldLatest)) { + return; + } + notifyListeners(newLatest); + if (!initialValueFuture.isDone()) { + initialValueFuture.complete(newLatest); + } + }); + }); } - private void notifyListeners(Latest latest) { - if (closed) { - return; - } + @Override + public ScheduledExecutorService watchScheduler() { + return parent.watchScheduler(); + } - for (Map.Entry, Executor> entry : updateListeners) { - final BiConsumer listener = entry.getKey(); - final Executor executor = entry.getValue(); - if (mapperExecutor == executor) { - notifyListener(latest, listener); - } else { - executor.execute(() -> notifyListener(latest, listener)); - } + @Override + public CompletableFuture> initialValueFuture() { + return initialValueFuture; + } + + @Override + public void close() { + closed = true; + if (!initialValueFuture.isDone()) { + initialValueFuture.cancel(false); } } @@ -123,52 +142,38 @@ private void notifyListener(Latest latest, BiConsumer latest) { + if (closed) { + return; + } - @Override - public CompletableFuture> initialValueFuture() { - return initialValueFuture; + for (Map.Entry, Executor> entry : updateListeners) { + final BiConsumer listener = entry.getKey(); + final Executor executor = entry.getValue(); + executor.execute(() -> notifyListener(latest, listener)); + } } @Override public Latest latest() { - final Latest mappedLatest = this.mappedLatest; + final Latest mappedLatest = this.mappedLatest.get(); if (mappedLatest == null) { throw new IllegalStateException("value not available yet"); } return mappedLatest; } - @Override - public void close() { - closed = true; - if (!initialValueFuture.isDone()) { - initialValueFuture.cancel(false); - } - if (closeParentWhenClosing) { - parent.close(); - } - } - - @Override - public void watch(BiConsumer listener) { - watch(listener, parent.watchScheduler()); - } - @Override public void watch(BiConsumer listener, Executor executor) { requireNonNull(listener, "listener"); requireNonNull(executor, "executor"); updateListeners.add(Maps.immutableEntry(listener, executor)); - final Latest mappedLatest = this.mappedLatest; + final Latest mappedLatest = this.mappedLatest.get(); if (mappedLatest != null) { // There's a chance that listener.accept(...) is called twice for the same value // if this watch method is called: @@ -181,6 +186,11 @@ public void watch(BiConsumer listener, Executor exe } } + @Override + public void watch(BiConsumer listener) { + watch(listener, parent.watchScheduler()); + } + @Override public String toString() { return toStringHelper(this) diff --git a/client/java/src/main/java/com/linecorp/centraldogma/client/Watcher.java b/client/java/src/main/java/com/linecorp/centraldogma/client/Watcher.java index de8e0247b..b77e51f17 100644 --- a/client/java/src/main/java/com/linecorp/centraldogma/client/Watcher.java +++ b/client/java/src/main/java/com/linecorp/centraldogma/client/Watcher.java @@ -273,6 +273,14 @@ default Watcher newChild(Function mapper) { default Watcher newChild(Function mapper, Executor executor) { requireNonNull(mapper, "mapper"); requireNonNull(executor, "executor"); - return MappingWatcher.of(this, mapper, executor, false); + return newChildAsync(t -> CompletableFuture.supplyAsync(() -> mapper.apply(t), executor)); + } + + /** + * Returns a {@link Watcher} that applies the {@link Function} for the {@link Latest#value()}. + */ + default Watcher newChildAsync(Function> mapper) { + requireNonNull(mapper, "mapper"); + return MappingWatcher.of(this, mapper); } }