Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -98,7 +99,7 @@ void multipleNewChild() throws Exception {
.watcher(Query.ofText("/baz.txt"))
.start();
final Watcher<Boolean> watcher = originalWatcher
.newChild(txt -> 1)
.newChildAsync(txt -> CompletableFuture.completedFuture(1))
.newChild(intValue -> Integer.toString(intValue))
.newChild("1"::equals);

Expand All @@ -120,4 +121,35 @@ void mapperException() {
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}

@Test
void mapperAsyncFailure() {
final Watcher<String> originalWatcher = dogma.client()
.forRepo("foo", "bar")
.watcher(Query.ofText("/baz.txt"))
.start();
originalWatcher.initialValueFuture().join();
final Watcher<String> watcher = originalWatcher.newChildAsync(unused ->
CompletableFuture.failedFuture(new RuntimeException()))
.newChild(val -> "not called");
await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join())
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}

@Test
void mapperAsyncException() {
final Watcher<String> originalWatcher = dogma.client()
.forRepo("foo", "bar")
.watcher(Query.ofText("/baz.txt"))
.start();
originalWatcher.initialValueFuture().join();
final Watcher<String> watcher = originalWatcher.newChildAsync(unused -> {
throw new RuntimeException();
})
.newChild(val -> "not called");
await().untilAsserted(() -> assertThatThrownBy(() -> watcher.initialValueFuture().join())
.hasCauseExactlyInstanceOf(RuntimeException.class));
originalWatcher.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 LINE Corporation
* Copyright 2026 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -20,12 +20,12 @@

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand All @@ -38,83 +38,102 @@
import com.linecorp.centraldogma.common.Revision;

final class MappingWatcher<T, U> implements Watcher<U> {

private static final Logger logger = LoggerFactory.getLogger(MappingWatcher.class);

static <T, U> MappingWatcher<T, U> of(Watcher<T> parent, Function<? super T, ? extends U> mapper,
Executor executor, boolean closeParentWhenClosing) {
static <T, U> MappingWatcher<T, U> of(Watcher<T> parent,
Function<? super T, ? extends CompletableFuture<? extends U>>
mapper) {
requireNonNull(parent, "parent");
requireNonNull(mapper, "mapper");
requireNonNull(executor, "executor");
// TODO(minwoo): extract mapper function and combine it with the new mapper.
return new MappingWatcher<>(parent, mapper, executor, closeParentWhenClosing);
return new MappingWatcher<>(parent, mapper);
}

private final Watcher<T> parent;
private final Function<? super T, ? extends U> mapper;
private final Executor mapperExecutor;
private final boolean closeParentWhenClosing;
private final CompletableFuture<Latest<U>> initialValueFuture = new CompletableFuture<>();
private final List<Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
new CopyOnWriteArrayList<>();
private static <U> boolean isUpdate(Latest<U> newLatest, @Nullable Latest<U> existing) {
if (existing == null) {
return true;
}
if (Objects.equals(existing.value(), newLatest.value())) {
return false;
}
return newLatest.revision().compareTo(existing.revision()) >= 0;
}

@Nullable
private volatile Latest<U> mappedLatest;
private final CompletableFuture<Latest<U>> initialValueFuture = new CompletableFuture<>();
private volatile boolean closed;
private final Watcher<T> parent;

MappingWatcher(Watcher<T> parent, Function<? super T, ? extends U> mapper, Executor mapperExecutor,
boolean closeParentWhenClosing) {
private final List<Map.Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
new CopyOnWriteArrayList<>();
private final Function<? super T, ? extends CompletableFuture<? extends U>> mapper;
private final AtomicReference<@Nullable Latest<U>> mappedLatest = new AtomicReference<>();

MappingWatcher(Watcher<T> parent, Function<? super T, ? extends CompletableFuture<? extends U>>
mapper) {
this.parent = parent;
this.mapper = mapper;
this.mapperExecutor = mapperExecutor;
this.closeParentWhenClosing = closeParentWhenClosing;
parent.initialValueFuture().exceptionally(cause -> {
initialValueFuture.completeExceptionally(cause);
return null;
});
final BiConsumer<Throwable, Revision> reportFailure = (e, r) -> {
logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}, revision {}", mapper,
r, e);
if (!initialValueFuture.isDone()) {
initialValueFuture.completeExceptionally(e);
}
close();
};
parent.watch((revision, value) -> {
if (closed) {
return;
}
final U mappedValue;
final CompletableFuture<? extends U> mappedValueFuture;
try {
mappedValue = mapper.apply(value);
mappedValueFuture = mapper.apply(value);
} catch (Exception e) {
logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}", mapper, e);
if (!initialValueFuture.isDone()) {
initialValueFuture.completeExceptionally(e);
}
close();
return;
}
final Latest<U> oldLatest = mappedLatest;
if (oldLatest != null && Objects.equals(oldLatest.value(), mappedValue)) {
reportFailure.accept(e, revision);
return;
}
mappedValueFuture.whenComplete((mappedValue, e) -> {
if (closed) {
return;
}
if (null != e) {
reportFailure.accept(e, revision);
return;
}

// mappedValue can be nullable which is fine.
final Latest<U> newLatest = new Latest<>(revision, mappedValue);
mappedLatest = newLatest;
notifyListeners(newLatest);
if (!initialValueFuture.isDone()) {
initialValueFuture.complete(newLatest);
}
}, mapperExecutor);
// mappedValue can be nullable which is fine.
final Latest<U> newLatest = new Latest<>(revision, mappedValue);
final Latest<U> oldLatest = mappedLatest.getAndUpdate(
existing -> isUpdate(newLatest, existing) ? newLatest : existing
);
if (!isUpdate(newLatest, oldLatest)) {
return;
}
notifyListeners(newLatest);
if (!initialValueFuture.isDone()) {
initialValueFuture.complete(newLatest);
}
});
});
}

private void notifyListeners(Latest<U> latest) {
if (closed) {
return;
}
@Override
public ScheduledExecutorService watchScheduler() {
return parent.watchScheduler();
}

for (Map.Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
final Executor executor = entry.getValue();
if (mapperExecutor == executor) {
notifyListener(latest, listener);
} else {
executor.execute(() -> notifyListener(latest, listener));
}
@Override
public CompletableFuture<Latest<U>> initialValueFuture() {
return initialValueFuture;
}

@Override
public void close() {
closed = true;
if (!initialValueFuture.isDone()) {
initialValueFuture.cancel(false);
}
}

Expand All @@ -123,52 +142,38 @@ private void notifyListener(Latest<U> latest, BiConsumer<? super Revision, ? sup
listener.accept(latest.revision(), latest.value());
} catch (Exception e) {
logger.warn("Unexpected exception is raised from {}: rev={}",
listener, latest.revision(), e);
listener, latest.revision(), e);
}
}

@Override
public ScheduledExecutorService watchScheduler() {
return parent.watchScheduler();
}
private void notifyListeners(Latest<U> latest) {
if (closed) {
return;
}

@Override
public CompletableFuture<Latest<U>> initialValueFuture() {
return initialValueFuture;
for (Map.Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
final Executor executor = entry.getValue();
executor.execute(() -> notifyListener(latest, listener));
}
}

@Override
public Latest<U> latest() {
final Latest<U> mappedLatest = this.mappedLatest;
final Latest<U> mappedLatest = this.mappedLatest.get();
if (mappedLatest == null) {
throw new IllegalStateException("value not available yet");
}
return mappedLatest;
}

@Override
public void close() {
closed = true;
if (!initialValueFuture.isDone()) {
initialValueFuture.cancel(false);
}
if (closeParentWhenClosing) {
parent.close();
}
}

@Override
public void watch(BiConsumer<? super Revision, ? super U> listener) {
watch(listener, parent.watchScheduler());
}

@Override
public void watch(BiConsumer<? super Revision, ? super U> listener, Executor executor) {
requireNonNull(listener, "listener");
requireNonNull(executor, "executor");
updateListeners.add(Maps.immutableEntry(listener, executor));

final Latest<U> mappedLatest = this.mappedLatest;
final Latest<U> mappedLatest = this.mappedLatest.get();
if (mappedLatest != null) {
// There's a chance that listener.accept(...) is called twice for the same value
// if this watch method is called:
Expand All @@ -181,6 +186,11 @@ public void watch(BiConsumer<? super Revision, ? super U> listener, Executor exe
}
}

@Override
public void watch(BiConsumer<? super Revision, ? super U> listener) {
watch(listener, parent.watchScheduler());
}

@Override
public String toString() {
return toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper) {
default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper, Executor executor) {
requireNonNull(mapper, "mapper");
requireNonNull(executor, "executor");
return MappingWatcher.of(this, mapper, executor, false);
return newChildAsync(t -> CompletableFuture.supplyAsync(() -> mapper.apply(t), executor));
}

/**
* Returns a {@link Watcher} that applies the {@link Function} for the {@link Latest#value()}.
*/
default <U> Watcher<U> newChildAsync(Function<? super T, ? extends CompletableFuture<? extends U>> mapper) {
requireNonNull(mapper, "mapper");
return MappingWatcher.of(this, mapper);
}
}
Loading