Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -120,4 +121,35 @@ void mapperException() {
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}

@Test
void mapperAsyncFailure() {
final Watcher<String> originalWatcher = dogma.client()
.forRepo("foo", "bar")
.watcher(Query.ofText("/baz.txt"))
.start();
originalWatcher.initialValueFuture().join();
final Watcher<String> watcher = originalWatcher.newChildAsync(unused ->
CompletableFuture.failedFuture(new RuntimeException()))
.newChild(val -> "not called");
await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join())
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}

@Test
void mapperAsyncException() {
final Watcher<String> originalWatcher = dogma.client()
.forRepo("foo", "bar")
.watcher(Query.ofText("/baz.txt"))
.start();
originalWatcher.initialValueFuture().join();
final Watcher<String> watcher = originalWatcher.newChildAsync(unused -> {
throw new RuntimeException();
})
.newChild(val -> "not called");
await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join())
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2026 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.centraldogma.client;

import static java.util.Objects.requireNonNull;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;

import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;

import com.linecorp.centraldogma.common.Revision;

abstract class AbstractMappingWatcher<T, U> implements Watcher<U> {
private static final Logger logger = LoggerFactory.getLogger(AbstractMappingWatcher.class);

final CompletableFuture<Latest<U>> initialValueFuture = new CompletableFuture<>();
volatile boolean closed;
final Watcher<T> parent;

private final boolean closeParentWhenClosing;
private final List<Map.Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
new CopyOnWriteArrayList<>();

AbstractMappingWatcher(Watcher<T> parent, boolean closeParentWhenClosing) {
this.parent = parent;
this.closeParentWhenClosing = closeParentWhenClosing;
parent.initialValueFuture().exceptionally(cause -> {
initialValueFuture.completeExceptionally(cause);
return null;
});
}

@Override
public final ScheduledExecutorService watchScheduler() {
return parent.watchScheduler();
}

@Override
public final CompletableFuture<Latest<U>> initialValueFuture() {
return initialValueFuture;
}

@Override
public final void close() {
closed = true;
if (!initialValueFuture.isDone()) {
initialValueFuture.cancel(false);
}
if (closeParentWhenClosing) {
parent.close();
}
}

private void notifyListener(Latest<U> latest, BiConsumer<? super Revision, ? super U> listener) {
try {
listener.accept(latest.revision(), latest.value());
} catch (Exception e) {
logger.warn("Unexpected exception is raised from {}: rev={}",
listener, latest.revision(), e);
}
}

protected final void notifyListeners(Latest<U> latest, @Nullable Executor currentExecutor) {
if (closed) {
return;
}

for (Map.Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
final Executor executor = entry.getValue();
if (currentExecutor == executor) {
notifyListener(latest, listener);
} else {
executor.execute(() -> notifyListener(latest, listener));
}
Copy link

@coderabbitai coderabbitai bot Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Centralize callback dispatch so post-close and catch-up notifications follow the same rules.

closed is only checked before the task is enqueued. If close() wins the race after scheduling but before execution, both normal updates and the catch-up callback in watch(...) can still invoke listeners after shutdown. Also, Line 128 bypasses notifyListener(...), so exceptions thrown during the initial catch-up notification are not logged/isolated the same way as later updates.

Suggested refactor
+    private void dispatchListener(Latest<U> latest,
+                                  BiConsumer<? super Revision, ? super U> listener,
+                                  Executor executor) {
+        executor.execute(() -> {
+            if (!closed) {
+                notifyListener(latest, listener);
+            }
+        });
+    }
+
     protected final void notifyListeners(Latest<U> latest, `@Nullable` Executor currentExecutor) {
         if (closed) {
             return;
         }
 
         for (Map.Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
             final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
             final Executor executor = entry.getValue();
             if (currentExecutor == executor) {
-                notifyListener(latest, listener);
+                if (!closed) {
+                    notifyListener(latest, listener);
+                }
             } else {
-                executor.execute(() -> notifyListener(latest, listener));
+                dispatchListener(latest, listener, executor);
             }
         }
     }
@@
         final Latest<U> mappedLatest = mappedLatest();
         if (mappedLatest != null) {
-            executor.execute(() -> listener.accept(mappedLatest.revision(), mappedLatest.value()));
+            dispatchListener(mappedLatest, listener, executor);
         }
     }

Also applies to: 119-128

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@client/java/src/main/java/com/linecorp/centraldogma/client/AbstractMappingWatcher.java`
around lines 86 - 98, The notifyListeners loop checks the closed flag only
before enqueuing, allowing callbacks to run after close and bypasses
notifyListener for same-thread dispatch which skips exception isolation/logging;
to fix, move the closed check inside the dispatched task (or at the start of
notifyListener) so every invocation verifies closed at execution time, and
always invoke notifyListener (even when currentExecutor == executor) via the
executor.execute(...) path or a unified dispatch helper so exceptions are
handled/logged consistently; update references: notifyListeners, notifyListener,
closed, and the watch catch-up invocation to use the same dispatch helper so
post-close and initial catch-up follow identical rules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the existing MappedWatcher behaviour. I assume the closing is best-effort rather than meant to be 100% reliable.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m50d, understood — best-effort close semantics are a reasonable and common design choice for watcher/listener patterns, especially when strict post-close suppression would add synchronization overhead with limited practical benefit. I'll note this for future reviews.


✏️ Learnings added
Learnt from: m50d
Repo: line/centraldogma PR: 1272
File: client/java/src/main/java/com/linecorp/centraldogma/client/AbstractMappingWatcher.java:86-98
Timestamp: 2026-03-09T07:39:08.241Z
Learning: In `AbstractMappingWatcher` (`client/java/src/main/java/com/linecorp/centraldogma/client/AbstractMappingWatcher.java`), the `closed` flag is checked in `notifyListeners` before enqueuing tasks, but not re-checked inside dispatched tasks. This is intentional best-effort close semantics, matching the existing `MappingWatcher` behavior. Do not flag the race between `close()` and listener dispatch as a bug; strict post-close suppression is not a design goal here.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: m50d
Repo: line/centraldogma PR: 1272
File: client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java:75-80
Timestamp: 2026-03-09T07:38:15.243Z
Learning: In `AsyncMappingWatcher` (`client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java`), closing the watcher on ANY mapper failure — including out-of-order (stale) async failures — is an intentional design decision. Do not flag this as a bug; the intent is to treat any mapper exception as a fatal signal that warrants closing the watcher, even if a newer revision has already succeeded.

}
}

protected abstract @Nullable Latest<U> mappedLatest();

@Override
public final Latest<U> latest() {
final Latest<U> mappedLatest = mappedLatest();
if (mappedLatest == null) {
throw new IllegalStateException("value not available yet");
}
return mappedLatest;
}

@Override
public final void watch(BiConsumer<? super Revision, ? super U> listener, Executor executor) {
requireNonNull(listener, "listener");
requireNonNull(executor, "executor");
updateListeners.add(Maps.immutableEntry(listener, executor));

final Latest<U> mappedLatest = mappedLatest();
if (mappedLatest != null) {
// There's a chance that listener.accept(...) is called twice for the same value
// if this watch method is called:
// - after "mappedLatest = newLatest;" is invoked.
// - and before notifyListener() is called.
// However, it's such a rare case and we usually call `watch` method after creating a Watcher,
// which means mappedLatest is probably not set yet, so we don't use a lock to guarantee
// the atomicity.
executor.execute(() -> listener.accept(mappedLatest.revision(), mappedLatest.value()));
}
}

@Override
public final void watch(BiConsumer<? super Revision, ? super U> listener) {
watch(listener, parent.watchScheduler());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2026 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.centraldogma.client;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.centraldogma.common.Revision;

final class AsyncMappingWatcher<T, U> extends AbstractMappingWatcher<T, U> {

private static final Logger logger = LoggerFactory.getLogger(AsyncMappingWatcher.class);

static <T, U> AsyncMappingWatcher<T, U> of(Watcher<T> parent,
Function<? super T, ? extends CompletableFuture<? extends U>>
mapper,
boolean closeParentWhenClosing) {
requireNonNull(parent, "parent");
requireNonNull(mapper, "mapper");
return new AsyncMappingWatcher<>(parent, mapper, closeParentWhenClosing);
}

private final Function<? super T, ? extends CompletableFuture<? extends U>> mapper;
private final List<Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
new CopyOnWriteArrayList<>();

private final AtomicReference<@Nullable Latest<U>> mappedLatest = new AtomicReference<>();

private static <U> boolean isUpdate(Latest<U> newLatest, @Nullable Latest<U> existing) {
if (existing == null) {
return true;
}
if (Objects.equals(existing.value(), newLatest.value())) {
return false;
}
return newLatest.revision().compareTo(existing.revision()) >= 0;
}

AsyncMappingWatcher(Watcher<T> parent, Function<? super T, ? extends CompletableFuture<? extends U>> mapper,
boolean closeParentWhenClosing) {
super(parent, closeParentWhenClosing);
this.mapper = mapper;
parent.initialValueFuture().exceptionally(cause -> {
initialValueFuture.completeExceptionally(cause);
return null;
});
final Consumer<Throwable> reportFailure = (e) -> {
logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}", mapper, e);
if (!initialValueFuture.isDone()) {
initialValueFuture.completeExceptionally(e);
}
close();
Copy link

@coderabbitai coderabbitai bot Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't let stale async failures close newer state.

The success path suppresses out-of-order completions, but the failure path does not. If a later revision has already published mappedLatest and an earlier future fails afterwards, Line 98 still closes the watcher, so an outdated completion can tear down newer state. Gate failure handling with the same staleness rule, or track the newest started revision separately.

Also applies to: 93-99

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java`
around lines 75 - 80, The failure handler (reportFailure Consumer) must be gated
by the same staleness rule used in the success path so an out-of-order earlier
failure cannot close newer state; modify the reportFailure logic (and the
equivalent block around lines 93-99) to check the associated
revision/startedRevision against the current published mappedLatest (or a
tracked newestStartedRevision) and only call
initialValueFuture.completeExceptionally(e) and close() when the failure is for
the latest non-stale revision; in short, add the same revision-staleness guard
used when handling successful mapper.apply() completions before performing
failure completion or calling close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok? We intentionally close on any failure, even an out-of-order one.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m50d, understood — thanks for the clarification. Closing on any mapper failure (regardless of revision order) is a reasonable defensive posture: if the mapper throws or returns a failed future at all, it signals something is fundamentally wrong with the mapper itself, so tearing down the watcher eagerly makes sense.

Acknowledged and moving on.


✏️ Learnings added
Learnt from: m50d
Repo: line/centraldogma PR: 1272
File: client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java:75-80
Timestamp: 2026-03-09T07:38:15.243Z
Learning: In `AsyncMappingWatcher` (`client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java`), closing the watcher on ANY mapper failure — including out-of-order (stale) async failures — is an intentional design decision. Do not flag this as a bug; the intent is to treat any mapper exception as a fatal signal that warrants closing the watcher, even if a newer revision has already succeeded.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

};
parent.watch((revision, value) -> {
if (closed) {
return;
}
final CompletableFuture<? extends U> mappedValueFuture;
try {
mappedValueFuture = mapper.apply(value);
} catch (Exception e) {
reportFailure.accept(e);
return;
}
mappedValueFuture.whenComplete((mappedValue, e) -> {
Copy link

@coderabbitai coderabbitai bot Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject null futures from the mapper.

If mapper.apply(value) returns null, Line 101 throws outside the current failure path, so the child watcher can fail without going through reportFailure(...) or completing initialValueFuture consistently.

Suggested fix
             final CompletableFuture<? extends U> mappedValueFuture;
             try {
-                mappedValueFuture = mapper.apply(value);
+                mappedValueFuture = requireNonNull(mapper.apply(value),
+                                                  "mapper.apply() returned null CompletableFuture");
             } catch (Exception e) {
                 reportFailure.accept(e);
                 return;
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
final CompletableFuture<? extends U> mappedValueFuture;
try {
mappedValueFuture = mapper.apply(value);
} catch (Exception e) {
reportFailure.accept(e);
return;
}
mappedValueFuture.whenComplete((mappedValue, e) -> {
final CompletableFuture<? extends U> mappedValueFuture;
try {
mappedValueFuture = requireNonNull(mapper.apply(value),
"mapper.apply() returned null CompletableFuture");
} catch (Exception e) {
reportFailure.accept(e);
return;
}
mappedValueFuture.whenComplete((mappedValue, e) -> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java`
around lines 94 - 101, The mapper.apply(value) call in AsyncMappingWatcher can
return null which causes a NPE at mappedValueFuture.whenComplete and bypasses
the failure handling; update the code after calling mapper.apply(value) (the
mappedValueFuture variable) to check for null and if null call
reportFailure.accept(new NullPointerException("mapper returned null")) and also
completeExceptionally(initialValueFuture, sameException) or otherwise ensure
initialValueFuture is completed exceptionally, then return so execution does not
reach mappedValueFuture.whenComplete; reference symbols: AsyncMappingWatcher,
mapper.apply, mappedValueFuture, reportFailure, initialValueFuture,
whenComplete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapper cannot actually return null given the function type?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check for `@NullMarked` annotations in the relevant package
fd 'package-info.java' --exec grep -l 'NullMarked' {}

# Also check for NullMarked at the class level in the same package
rg '@NullMarked' client/java/src/main/java/com/linecorp/centraldogma/client/ -n

# Check if NullAway or similar is configured
rg 'NullAway\|nullaway\|NullMarked' build.gradle --include='*.gradle' -rn 2>/dev/null || true
rg 'NullAway\|nullaway\|NullMarked' --type=gradle -rn 2>/dev/null || true

Length of output: 6362


@m50d, you are correct. The com.linecorp.centraldogma.client package is annotated with @NullMarked (Line 21 of package-info.java), so under JSpecify semantics the unannotated return type CompletableFuture<? extends U> is implicitly @NonNull. A mapper that returns null already violates the declared contract at the static-analysis level, making the defensive requireNonNull unnecessary. I'll withdraw this suggestion.

if (closed) {
return;
}
if (null != e) {
reportFailure.accept(e);
return;
}

// mappedValue can be nullable which is fine.
final Latest<U> newLatest = new Latest<>(revision, mappedValue);
final Latest<U> oldLatest = mappedLatest.getAndUpdate(
existing -> isUpdate(newLatest, existing) ? newLatest : existing
);
if (!isUpdate(newLatest, oldLatest)) {
return;
}
notifyListeners(newLatest, null);
if (!initialValueFuture.isDone()) {
initialValueFuture.complete(newLatest);
}
});
});
}

@Override
public String toString() {
return toStringHelper(this)
.add("parent", parent)
.add("mapper", mapper)
.add("closed", closed)
.toString();
}

@Override
protected @Nullable Latest<U> mappedLatest() {
return this.mappedLatest.get();
}
}
Loading