Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -98,7 +99,7 @@ void multipleNewChild() throws Exception {
.watcher(Query.ofText("/baz.txt"))
.start();
final Watcher<Boolean> watcher = originalWatcher
.newChild(txt -> 1)
.newChildAsync(txt -> CompletableFuture.completedFuture(1))
.newChild(intValue -> Integer.toString(intValue))
.newChild("1"::equals);

Expand All @@ -120,4 +121,35 @@ void mapperException() {
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}

@Test
void mapperAsyncFailure() {
final Watcher<String> originalWatcher = dogma.client()
.forRepo("foo", "bar")
.watcher(Query.ofText("/baz.txt"))
.start();
originalWatcher.initialValueFuture().join();
final Watcher<String> watcher = originalWatcher.newChildAsync(unused ->
CompletableFuture.failedFuture(new RuntimeException()))
.newChild(val -> "not called");
await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join())
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}

@Test
void mapperAsyncException() {
final Watcher<String> originalWatcher = dogma.client()
.forRepo("foo", "bar")
.watcher(Query.ofText("/baz.txt"))
.start();
originalWatcher.initialValueFuture().join();
final Watcher<String> watcher = originalWatcher.newChildAsync(unused -> {
throw new RuntimeException();
})
.newChild(val -> "not called");
await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join())
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 LINE Corporation
* Copyright 2026 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -20,12 +20,12 @@

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand All @@ -38,93 +38,92 @@
import com.linecorp.centraldogma.common.Revision;

final class MappingWatcher<T, U> implements Watcher<U> {

private static final Logger logger = LoggerFactory.getLogger(MappingWatcher.class);

static <T, U> MappingWatcher<T, U> of(Watcher<T> parent, Function<? super T, ? extends U> mapper,
Executor executor, boolean closeParentWhenClosing) {
static <T, U> MappingWatcher<T, U> of(Watcher<T> parent,
Function<? super T, ? extends CompletableFuture<? extends U>>
mapper,
boolean closeParentWhenClosing) {
requireNonNull(parent, "parent");
requireNonNull(mapper, "mapper");
requireNonNull(executor, "executor");
// TODO(minwoo): extract mapper function and combine it with the new mapper.
return new MappingWatcher<>(parent, mapper, executor, closeParentWhenClosing);
return new MappingWatcher<>(parent, mapper, closeParentWhenClosing);
}

private final CompletableFuture<Latest<U>> initialValueFuture = new CompletableFuture<>();
private volatile boolean closed;
private final Watcher<T> parent;
private final Function<? super T, ? extends U> mapper;
private final Executor mapperExecutor;

private final boolean closeParentWhenClosing;
private final CompletableFuture<Latest<U>> initialValueFuture = new CompletableFuture<>();
private final List<Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
private final List<Map.Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
new CopyOnWriteArrayList<>();
private final Function<? super T, ? extends CompletableFuture<? extends U>> mapper;
private final AtomicReference<@Nullable Latest<U>> mappedLatest = new AtomicReference<>();

@Nullable
private volatile Latest<U> mappedLatest;
private volatile boolean closed;
private static <U> boolean isUpdate(Latest<U> newLatest, @Nullable Latest<U> existing) {
if (existing == null) {
return true;
}
if (Objects.equals(existing.value(), newLatest.value())) {
return false;
}
return newLatest.revision().compareTo(existing.revision()) >= 0;
}

MappingWatcher(Watcher<T> parent, Function<? super T, ? extends U> mapper, Executor mapperExecutor,
boolean closeParentWhenClosing) {
MappingWatcher(Watcher<T> parent, Function<? super T, ? extends CompletableFuture<? extends U>>
mapper, boolean closeParentWhenClosing) {
this.parent = parent;
this.mapper = mapper;
this.mapperExecutor = mapperExecutor;
this.closeParentWhenClosing = closeParentWhenClosing;
parent.initialValueFuture().exceptionally(cause -> {
initialValueFuture.completeExceptionally(cause);
return null;
});
this.mapper = mapper;
parent.initialValueFuture().exceptionally(cause -> {
initialValueFuture.completeExceptionally(cause);
return null;
});
final BiConsumer<Throwable, Revision> reportFailure = (e, r) -> {
logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}, revision {}", mapper,
r, e);
if (!initialValueFuture.isDone()) {
initialValueFuture.completeExceptionally(e);
}
close();
};
parent.watch((revision, value) -> {
if (closed) {
return;
}
final U mappedValue;
final CompletableFuture<? extends U> mappedValueFuture;
try {
mappedValue = mapper.apply(value);
mappedValueFuture = mapper.apply(value);
} catch (Exception e) {
logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}", mapper, e);
if (!initialValueFuture.isDone()) {
initialValueFuture.completeExceptionally(e);
}
close();
reportFailure.accept(e, revision);
return;
}
final Latest<U> oldLatest = mappedLatest;
if (oldLatest != null && Objects.equals(oldLatest.value(), mappedValue)) {
return;
}

// mappedValue can be nullable which is fine.
final Latest<U> newLatest = new Latest<>(revision, mappedValue);
mappedLatest = newLatest;
notifyListeners(newLatest);
if (!initialValueFuture.isDone()) {
initialValueFuture.complete(newLatest);
}
}, mapperExecutor);
}

private void notifyListeners(Latest<U> latest) {
if (closed) {
return;
}

for (Map.Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
final Executor executor = entry.getValue();
if (mapperExecutor == executor) {
notifyListener(latest, listener);
} else {
executor.execute(() -> notifyListener(latest, listener));
}
}
}
mappedValueFuture.whenComplete((mappedValue, e) -> {
if (closed) {
return;
}
if (null != e) {
reportFailure.accept(e, revision);
return;
}

private void notifyListener(Latest<U> latest, BiConsumer<? super Revision, ? super U> listener) {
try {
listener.accept(latest.revision(), latest.value());
} catch (Exception e) {
logger.warn("Unexpected exception is raised from {}: rev={}",
listener, latest.revision(), e);
}
// mappedValue can be nullable which is fine.
final Latest<U> newLatest = new Latest<>(revision, mappedValue);
final Latest<U> oldLatest = mappedLatest.getAndUpdate(
existing -> isUpdate(newLatest, existing) ? newLatest : existing
);
if (!isUpdate(newLatest, oldLatest)) {
return;
}
notifyListeners(newLatest);
if (!initialValueFuture.isDone()) {
initialValueFuture.complete(newLatest);
}
});
});
}

@Override
Expand All @@ -137,15 +136,6 @@ public CompletableFuture<Latest<U>> initialValueFuture() {
return initialValueFuture;
}

@Override
public Latest<U> latest() {
final Latest<U> mappedLatest = this.mappedLatest;
if (mappedLatest == null) {
throw new IllegalStateException("value not available yet");
}
return mappedLatest;
}

@Override
public void close() {
closed = true;
Expand All @@ -157,9 +147,34 @@ public void close() {
}
}

private void notifyListener(Latest<U> latest, BiConsumer<? super Revision, ? super U> listener) {
try {
listener.accept(latest.revision(), latest.value());
} catch (Exception e) {
logger.warn("Unexpected exception is raised from {}: rev={}",
listener, latest.revision(), e);
}
}

private void notifyListeners(Latest<U> latest) {
if (closed) {
return;
}

for (Map.Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
final Executor executor = entry.getValue();
executor.execute(() -> notifyListener(latest, listener));
}
}

@Override
public void watch(BiConsumer<? super Revision, ? super U> listener) {
watch(listener, parent.watchScheduler());
public Latest<U> latest() {
final Latest<U> mappedLatest = this.mappedLatest.get();
if (mappedLatest == null) {
throw new IllegalStateException("value not available yet");
}
return mappedLatest;
}

@Override
Expand All @@ -168,7 +183,7 @@ public void watch(BiConsumer<? super Revision, ? super U> listener, Executor exe
requireNonNull(executor, "executor");
updateListeners.add(Maps.immutableEntry(listener, executor));

final Latest<U> mappedLatest = this.mappedLatest;
final Latest<U> mappedLatest = this.mappedLatest.get();
if (mappedLatest != null) {
// There's a chance that listener.accept(...) is called twice for the same value
// if this watch method is called:
Expand All @@ -181,6 +196,11 @@ public void watch(BiConsumer<? super Revision, ? super U> listener, Executor exe
}
}

@Override
public void watch(BiConsumer<? super Revision, ? super U> listener) {
watch(listener, parent.watchScheduler());
}

@Override
public String toString() {
return toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper) {
default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper, Executor executor) {
requireNonNull(mapper, "mapper");
requireNonNull(executor, "executor");
return MappingWatcher.of(this, mapper, executor, false);
return newChildAsync(t -> CompletableFuture.supplyAsync(() -> mapper.apply(t), executor));
}

/**
* Returns a {@link Watcher} that applies the {@link Function} for the {@link Latest#value()}.
*/
default <U> Watcher<U> newChildAsync(Function<? super T, ? extends CompletableFuture<? extends U>> mapper) {
requireNonNull(mapper, "mapper");
return MappingWatcher.of(this, mapper, false);
}
}