Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -120,4 +121,35 @@ void mapperException() {
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}

@Test
void mapperAsyncFailure() {
final Watcher<String> originalWatcher = dogma.client()
.forRepo("foo", "bar")
.watcher(Query.ofText("/baz.txt"))
.start();
originalWatcher.initialValueFuture().join();
final Watcher<String> watcher = originalWatcher.newChildAsync(unused ->
CompletableFuture.failedFuture(new RuntimeException()))
.newChild(val -> "not called");
await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join())
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}

@Test
void mapperAsyncException() {
final Watcher<String> originalWatcher = dogma.client()
.forRepo("foo", "bar")
.watcher(Query.ofText("/baz.txt"))
.start();
originalWatcher.initialValueFuture().join();
final Watcher<String> watcher = originalWatcher.newChildAsync(unused -> {
throw new RuntimeException();
})
.newChild(val -> "not called");
await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join())
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright 2026 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.centraldogma.client;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;

import com.linecorp.centraldogma.common.Revision;

final class AsyncMappingWatcher<T, U> implements Watcher<U> {

private static final Logger logger = LoggerFactory.getLogger(AsyncMappingWatcher.class);

static <T, U> AsyncMappingWatcher<T, U> of(Watcher<T> parent,
Function<? super T, ? extends CompletableFuture<? extends U>>
mapper,
boolean closeParentWhenClosing) {
requireNonNull(parent, "parent");
requireNonNull(mapper, "mapper");
return new AsyncMappingWatcher<>(parent, mapper, closeParentWhenClosing);
}

private final Watcher<T> parent;
private final Function<? super T, ? extends CompletableFuture<? extends U>> mapper;
private final boolean closeParentWhenClosing;
private final CompletableFuture<Latest<U>> initialValueFuture = new CompletableFuture<>();
private final List<Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
new CopyOnWriteArrayList<>();

private final AtomicReference<@Nullable Latest<U>> mappedLatest = new AtomicReference<>();
private volatile boolean closed;

private static <U> boolean isUpdate(Latest<U> newLatest, @Nullable Latest<U> existing) {
if (existing == null) {
return true;
}
if (Objects.equals(existing.value(), newLatest.value())) {
return false;
}
return newLatest.revision().compareTo(existing.revision()) >= 0;
}

AsyncMappingWatcher(Watcher<T> parent, Function<? super T, ? extends CompletableFuture<? extends U>> mapper,
boolean closeParentWhenClosing) {
this.parent = parent;
this.mapper = mapper;
this.closeParentWhenClosing = closeParentWhenClosing;
parent.initialValueFuture().exceptionally(cause -> {
initialValueFuture.completeExceptionally(cause);
return null;
});
final Consumer<Throwable> reportFailure = (e) -> {
logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}", mapper, e);
if (!initialValueFuture.isDone()) {
initialValueFuture.completeExceptionally(e);
}
close();
Copy link

@coderabbitai coderabbitai bot Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't let stale async failures close newer state.

The success path suppresses out-of-order completions, but the failure path does not. If a later revision has already published mappedLatest and an earlier future fails afterwards, Line 98 still closes the watcher, so an outdated completion can tear down newer state. Gate failure handling with the same staleness rule, or track the newest started revision separately.

Also applies to: 93-99

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java`
around lines 75 - 80, The failure handler (reportFailure Consumer) must be gated
by the same staleness rule used in the success path so an out-of-order earlier
failure cannot close newer state; modify the reportFailure logic (and the
equivalent block around lines 93-99) to check the associated
revision/startedRevision against the current published mappedLatest (or a
tracked newestStartedRevision) and only call
initialValueFuture.completeExceptionally(e) and close() when the failure is for
the latest non-stale revision; in short, add the same revision-staleness guard
used when handling successful mapper.apply() completions before performing
failure completion or calling close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok? We intentionally close on any failure, even an out-of-order one.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m50d, understood — thanks for the clarification. Closing on any mapper failure (regardless of revision order) is a reasonable defensive posture: if the mapper throws or returns a failed future at all, it signals something is fundamentally wrong with the mapper itself, so tearing down the watcher eagerly makes sense.

Acknowledged and moving on.


✏️ Learnings added
Learnt from: m50d
Repo: line/centraldogma PR: 1272
File: client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java:75-80
Timestamp: 2026-03-09T07:38:15.243Z
Learning: In `AsyncMappingWatcher` (`client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java`), closing the watcher on ANY mapper failure — including out-of-order (stale) async failures — is an intentional design decision. Do not flag this as a bug; the intent is to treat any mapper exception as a fatal signal that warrants closing the watcher, even if a newer revision has already succeeded.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

};
parent.watch((revision, value) -> {
if (closed) {
return;
}
final CompletableFuture<? extends U> mappedValueFuture;
try {
mappedValueFuture = mapper.apply(value);
} catch (Exception e) {
reportFailure.accept(e);
return;
}
mappedValueFuture.whenComplete((mappedValue, e) -> {
Copy link

@coderabbitai coderabbitai bot Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject null futures from the mapper.

If mapper.apply(value) returns null, Line 101 throws outside the current failure path, so the child watcher can fail without going through reportFailure(...) or completing initialValueFuture consistently.

Suggested fix
             final CompletableFuture<? extends U> mappedValueFuture;
             try {
-                mappedValueFuture = mapper.apply(value);
+                mappedValueFuture = requireNonNull(mapper.apply(value),
+                                                  "mapper.apply() returned null CompletableFuture");
             } catch (Exception e) {
                 reportFailure.accept(e);
                 return;
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
final CompletableFuture<? extends U> mappedValueFuture;
try {
mappedValueFuture = mapper.apply(value);
} catch (Exception e) {
reportFailure.accept(e);
return;
}
mappedValueFuture.whenComplete((mappedValue, e) -> {
final CompletableFuture<? extends U> mappedValueFuture;
try {
mappedValueFuture = requireNonNull(mapper.apply(value),
"mapper.apply() returned null CompletableFuture");
} catch (Exception e) {
reportFailure.accept(e);
return;
}
mappedValueFuture.whenComplete((mappedValue, e) -> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@client/java/src/main/java/com/linecorp/centraldogma/client/AsyncMappingWatcher.java`
around lines 94 - 101, The mapper.apply(value) call in AsyncMappingWatcher can
return null which causes a NPE at mappedValueFuture.whenComplete and bypasses
the failure handling; update the code after calling mapper.apply(value) (the
mappedValueFuture variable) to check for null and if null call
reportFailure.accept(new NullPointerException("mapper returned null")) and also
completeExceptionally(initialValueFuture, sameException) or otherwise ensure
initialValueFuture is completed exceptionally, then return so execution does not
reach mappedValueFuture.whenComplete; reference symbols: AsyncMappingWatcher,
mapper.apply, mappedValueFuture, reportFailure, initialValueFuture,
whenComplete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapper cannot actually return null given the function type?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check for `@NullMarked` annotations in the relevant package
fd 'package-info.java' --exec grep -l 'NullMarked' {}

# Also check for NullMarked at the class level in the same package
rg '@NullMarked' client/java/src/main/java/com/linecorp/centraldogma/client/ -n

# Check if NullAway or similar is configured
rg 'NullAway\|nullaway\|NullMarked' build.gradle --include='*.gradle' -rn 2>/dev/null || true
rg 'NullAway\|nullaway\|NullMarked' --type=gradle -rn 2>/dev/null || true

Length of output: 6362


@m50d, you are correct. The com.linecorp.centraldogma.client package is annotated with @NullMarked (Line 21 of package-info.java), so under JSpecify semantics the unannotated return type CompletableFuture<? extends U> is implicitly @NonNull. A mapper that returns null already violates the declared contract at the static-analysis level, making the defensive requireNonNull unnecessary. I'll withdraw this suggestion.

if (null != e) {
reportFailure.accept(e);
return;
}

// mappedValue can be nullable which is fine.
final Latest<U> newLatest = new Latest<>(revision, mappedValue);
final Latest<U> oldLatest = mappedLatest.getAndUpdate(
existing -> isUpdate(newLatest, existing) ? newLatest : existing
);
if (!isUpdate(newLatest, oldLatest)) {
return;
}
notifyListeners(newLatest);
if (!initialValueFuture.isDone()) {
initialValueFuture.complete(newLatest);
}
});
});
}

private void notifyListeners(Latest<U> latest) {
if (closed) {
return;
}

for (Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
final Executor executor = entry.getValue();
executor.execute(() -> notifyListener(latest, listener));
}
}

private void notifyListener(Latest<U> latest, BiConsumer<? super Revision, ? super U> listener) {
try {
listener.accept(latest.revision(), latest.value());
} catch (Exception e) {
logger.warn("Unexpected exception is raised from {}: rev={}",
listener, latest.revision(), e);
}
}

@Override
public ScheduledExecutorService watchScheduler() {
return parent.watchScheduler();
}

@Override
public CompletableFuture<Latest<U>> initialValueFuture() {
return initialValueFuture;
}

@Override
public Latest<U> latest() {
final Latest<U> mappedLatest = this.mappedLatest.get();
if (mappedLatest == null) {
throw new IllegalStateException("value not available yet");
}
return mappedLatest;
}

@Override
public void close() {
closed = true;
if (!initialValueFuture.isDone()) {
initialValueFuture.cancel(false);
}
if (closeParentWhenClosing) {
parent.close();
}
}

@Override
public void watch(BiConsumer<? super Revision, ? super U> listener) {
watch(listener, parent.watchScheduler());
}

@Override
public void watch(BiConsumer<? super Revision, ? super U> listener, Executor executor) {
requireNonNull(listener, "listener");
requireNonNull(executor, "executor");
updateListeners.add(Maps.immutableEntry(listener, executor));

final Latest<U> mappedLatest = this.mappedLatest.get();
if (mappedLatest != null) {
// There's a chance that listener.accept(...) is called twice for the same value
// if this watch method is called:
// - after "mappedLatest = newLatest;" is invoked.
// - and before notifyListener() is called.
// However, it's such a rare case and we usually call `watch` method after creating a Watcher,
// which means mappedLatest is probably not set yet, so we don't use a lock to guarantee
// the atomicity.
executor.execute(() -> listener.accept(mappedLatest.revision(), mappedLatest.value()));
}
}

@Override
public String toString() {
return toStringHelper(this)
.add("parent", parent)
.add("mapper", mapper)
.add("closed", closed)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,12 @@ default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper, Executo
requireNonNull(executor, "executor");
return MappingWatcher.of(this, mapper, executor, false);
}

/**
* Returns a {@link Watcher} that applies the {@link Function} for the {@link Latest#value()}.
*/
default <U> Watcher<U> newChildAsync(Function<? super T, ? extends CompletableFuture<? extends U>> mapper) {
requireNonNull(mapper, "mapper");
return AsyncMappingWatcher.of(this, mapper, false);
}
}