Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 125 additions & 88 deletions framework/src/dslabs/framework/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@

package dslabs.framework;

import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -35,17 +37,13 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.logging.Level;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.java.Log;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Nodes are the basic unit of computation. They can send and receive {@link Message}s, set and
Expand Down Expand Up @@ -104,20 +102,78 @@
@EqualsAndHashCode(of = {"subNodes"})
@ToString(of = {"address", "subNodes"})
public abstract class Node implements Serializable {

/**
* An {@code Environment} is how a {@link Node} sends messages to other nodes and sets timers. It
* also can handle exceptions which are thrown during the handling of events.
*
* @hidden
*/
public interface Environment {
void send(Message message, Address from, Address to);

default void broadcast(Message message, Address from, ImmutableList<Address> to) {
for (Address address : to) {
send(message, from, address);
}
}

default void set(Timer timer, Address destination, Duration duration) {
set(timer, destination, duration, duration);
}

void set(Timer timer, Address destination, Duration minDuration, Duration maxDuration);

/**
* Possibly handles a throwable which was thrown during the execution of an event handler.
*
* @param throwable The exception which was thrown during the handling of an event.
* @return Whether the throwable was consumed. If {@code false}, {@code throwable} should be
* thrown.
*/
default boolean handleThrowable(Throwable throwable) {
return false;
}
}

/**
* @param logExceptions Whether to log exceptions thrown by the node during message and timer
* handling, in addition to sending them to {@link Environment#handleThrowable}.
* @hidden
*/
public record Settings(boolean logExceptions) {
Settings() {
this(true);
}
}

private static final Map<Class<? extends Node>, Map<String, Optional<Method>>> methods =
new ConcurrentHashMap<>();

/** This Node's address. */
@VizIgnore @NonNull private final Address address;

private transient Consumer<Triple<Address, Address, Message>> messageAdder;
private transient Consumer<Triple<Address, Address[], Message>> batchMessageAdder;
private transient Consumer<Triple<Address, Timer, Pair<Integer, Integer>>> timerAdder;
private transient Consumer<Throwable> throwableCatcher;
private transient Boolean logExceptions = true;
/*
* INVARIANT: (environment == null && settings == null) || parentNode == null
*
* We do not store these as a sealed type, as environment and settings must be transient, while
* parentNode must not be transient.
*/

/**
* This node's {@link Environment}, if this node is a root node (i.e., not a sub-node). If this
* node is a sub-node, this must be {@code null}.
*/
private transient @Nullable Environment environment;

/**
* This node's current {@link Settings}, if this node is a root node (i.e., not a sub-node). If
* this node is a sub-node, this must be {@code null}.
*/
private transient @Nullable Settings settings;

/** The Node's parent (or null if this Node is the root Node in the hierarchy). */
@VizIgnore private Node parentNode;
/** The Node's parent (or {@code null} if this Node is a root node). */
@VizIgnore private @Nullable Node parentNode;

/**
* This Node's sub-Nodes, indexed by their ID. Sub-Nodes must have a {@link SubAddress} composed
Expand All @@ -134,6 +190,26 @@ protected Node(@NonNull Address address) {
this.address = address;
}

/** The {@link Environment} for this Node's root Node. */
private @Nullable Environment environment() {
if (parentNode != null) {
return parentNode.environment();
} else {
return environment;
}
}

/** The {@link Settings} for this Node's root Node. */
private Settings settings() {
if (parentNode != null) {
return parentNode.settings();
} else if (settings == null) {
return new Settings();
} else {
return settings;
}
}

/**
* Takes any initialization steps necessary (potentially sending {@link Message}s and setting
* {@link Timer}s).
Expand All @@ -153,9 +229,7 @@ protected final void addSubNode(@NonNull Node subNode) {
"Attempting to add subNode with address that isn't a subAddress of this node.");
}

if (subNode.messageAdder != null
|| subNode.batchMessageAdder != null
|| subNode.timerAdder != null) {
if (subNode.environment != null) {
throw new IllegalArgumentException(
"Cannot configure node; already configured as stand-alone.");
}
Expand All @@ -167,6 +241,7 @@ protected final void addSubNode(@NonNull Node subNode) {
}

subNode.parentNode = this;
subNode.settings = null;
subNodes.put(subAddress.id(), subNode);
}

Expand Down Expand Up @@ -261,24 +336,17 @@ private void send(Message message, Address from, Address to) {
return;
}

// If this Node is a sub-Node, use the parent to send the message.
if (parentNode != null && messageAdder == null && batchMessageAdder == null) {
parentNode.send(message, from, to);
return;
}

LOG.finest(() -> String.format("MessageSend(%s -> %s, %s)", from, to, message));

if (messageAdder != null) {
messageAdder.accept(new ImmutableTriple<>(from, to, message));
} else if (batchMessageAdder != null) {
batchMessageAdder.accept(new ImmutableTriple<>(from, new Address[] {to}, message));
} else {
Environment env = environment();
if (env == null) {
LOG.severe(
String.format(
"Attempting to send %s from %s to %s before node configured, not sending",
message, from, to));
return;
}

LOG.finest(() -> String.format("MessageSend(%s -> %s, %s)", from, to, message));
env.send(message, from, to);
}

private void broadcast(Message message, Address from, Address[] to) {
Expand All @@ -303,27 +371,18 @@ private void broadcast(Message message, Address from, Address[] to) {
}
}

// If this Node is a sub-Node, use the parent to broadcast the message.
if (parentNode != null && messageAdder == null && batchMessageAdder == null) {
parentNode.broadcast(message, from, to);
return;
}

LOG.finest(
() -> String.format("MessageSend(%s -> %s, %s)", from, Arrays.toString(to), message));

if (batchMessageAdder != null) {
batchMessageAdder.accept(new ImmutableTriple<>(from, to, message));
} else if (messageAdder != null) {
for (Address a : to) {
messageAdder.accept(new ImmutableTriple<>(from, a, message));
}
} else {
Environment env = environment();
if (env == null) {
LOG.severe(
String.format(
"Attempting to send %s from %s to %s before node configured, not sending",
message, from, Arrays.toString(to)));
return;
}

LOG.finest(
() -> String.format("MessageSend(%s -> %s, %s)", from, Arrays.toString(to), message));
env.broadcast(message, from, ImmutableList.copyOf(to));
}

private void set(Timer timer, int minTimerLengthMillis, int maxTimerLengthMillis, Address from) {
Expand All @@ -332,23 +391,20 @@ private void set(Timer timer, int minTimerLengthMillis, int maxTimerLengthMillis
return;
}

// If this Node is a sub-Node, use the parent to set the timer.
if (parentNode != null && timerAdder == null) {
parentNode.set(timer, minTimerLengthMillis, maxTimerLengthMillis, from);
return;
}

LOG.finest(() -> String.format("TimerSet(-> %s, %s)", from, timer));

if (timerAdder != null) {
timerAdder.accept(
new ImmutableTriple<>(
from, timer, new ImmutablePair<>(minTimerLengthMillis, maxTimerLengthMillis)));
} else {
Environment env = environment();
if (env == null) {
LOG.severe(
String.format(
"Attempting to set %s from %s before node configured, not setting", timer, from));
return;
}

LOG.finest(() -> String.format("TimerSet(-> %s, %s)", from, timer));
env.set(
timer,
from,
Duration.ofMillis(minTimerLengthMillis),
Duration.ofMillis(maxTimerLengthMillis));
}

private Object handleMessageInternal(
Expand Down Expand Up @@ -544,7 +600,7 @@ private Object callMethod(
throw t;
}

if (logExceptions) {
if (settings().logExceptions) {
LOG.log(
Level.SEVERE,
String.format(
Expand All @@ -553,8 +609,9 @@ private Object callMethod(
t);
}

if (throwableCatcher != null) {
throwableCatcher.accept(t);
Environment env = environment();
if (env == null || !env.handleThrowable(t)) {
throw t;
}
}

Expand All @@ -564,39 +621,19 @@ private Object callMethod(
/**
* <b>Do not use.</b> Only used by testing framework.
*
* <p>Configures the node to allow it to send messages and set timers.
* <p>Configures the node to allow it to send messages and set timers. Should only be set on a
* root Node.
*
* <p>At least one of {@code messageAdder}/{@code batchMessageAdder} must be non-null.
*
* @param messageAdder a function which consumes messages sent by the node, or {@code null} to
* have the node send all messages to the {@code batchMessageAdder}
* @param batchMessageAdder a function which consumes messages sent by the node to multiple
* recipients, or {@code null} to have the node send all messages to the {@code messageAdder}
* @param timerAdder a function which consumes timers set by the node
* @param throwableCatcher a function which consumes exceptions thrown by the node during message
* and timer handling, or {@code null} to have the node drop exceptions
* @param logExceptions whether to log exceptions thrown by the node during message and timer
* handling, in addition to sending them to the {@code throwableCatcher}
* @param environment The environment this node should use.
* @param settings The settings for this node.
* @hidden
*/
public void config(
Consumer<Triple<Address, Address, Message>> messageAdder,
Consumer<Triple<Address, Address[], Message>> batchMessageAdder,
@NonNull Consumer<Triple<Address, Timer, Pair<Integer, Integer>>> timerAdder,
Consumer<Throwable> throwableCatcher,
boolean logExceptions) {
public void config(Environment environment, Settings settings) {
if (parentNode != null) {
// TODO: Throw IllegalStateException?
LOG.severe("Cannot configure Node already configured as sub-Node.");
}

if (messageAdder == null && batchMessageAdder == null) {
LOG.severe("Cannot configure Node without messageAdder or batchMessageAdder.");
}

this.messageAdder = messageAdder;
this.batchMessageAdder = batchMessageAdder;
this.timerAdder = timerAdder;
this.throwableCatcher = throwableCatcher;
this.logExceptions = logExceptions;
this.environment = environment;
this.settings = settings;
}
}
13 changes: 3 additions & 10 deletions framework/tst/dslabs/framework/testing/ClientWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;

@EqualsAndHashCode(
of = {"client", "results"},
Expand Down Expand Up @@ -297,14 +295,9 @@ public synchronized void onTimer(Timer timer, Address destination) {
}

@Override
public void config(
Consumer<Triple<Address, Address, Message>> messageAdder,
Consumer<Triple<Address, Address[], Message>> batchMessageAdder,
Consumer<Triple<Address, Timer, Pair<Integer, Integer>>> timerAdder,
Consumer<Throwable> throwableCatcher,
boolean logExceptions) {
public void config(Environment environment, Settings settings) {
// TODO: make sure there's no overhead for having the config both places
super.config(messageAdder, batchMessageAdder, timerAdder, throwableCatcher, logExceptions);
client().config(messageAdder, batchMessageAdder, timerAdder, throwableCatcher, logExceptions);
super.config(environment, settings);
client().config(environment, settings);
}
}
Loading