Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/dev/enola/be/io/LineWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dev.enola.be.io;

import java.io.IOException;

public interface LineWriter {

void println(Object line) throws IOException;
}
21 changes: 21 additions & 0 deletions src/dev/enola/be/io/LineWriters.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dev.enola.be.io;

import java.io.IOException;

public final class LineWriters {

public static final LineWriter NOOP = _ -> {};
public static final LineWriter SYSTEM_OUT = from(System.out);
public static final LineWriter SYSTEM_ERR = from(System.err);

public static LineWriter from(Appendable appendable) {
return new LineWriter() {
@Override
public void println(Object line) throws IOException {
appendable.append(line.toString()).append(System.lineSeparator());
}
};
}

private LineWriters() {}
}
52 changes: 52 additions & 0 deletions src/dev/enola/be/io/NonBlockingLineWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package dev.enola.be.io;

import dev.enola.be.task.Status;
import dev.enola.be.task.TaskWithoutInputOutput;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class NonBlockingLineWriter extends TaskWithoutInputOutput implements LineWriter {

private final LineWriter delegate;
private final BlockingQueue<Object> queue;
private volatile boolean overflow;

public NonBlockingLineWriter(int queueCapacity, LineWriter delegate) {
this.delegate = delegate;
this.queue = new ArrayBlockingQueue<>(queueCapacity, false);
}

@Override
public void println(Object line) throws IOException {
if (status() != Status.IN_PROGRESS) throw new IllegalStateException();
Comment thread
vorburger marked this conversation as resolved.
if (!queue.offer(line)) overflow = true;
}

@Override
protected void executeIt() throws Exception {
try {
while (!Thread.currentThread().isInterrupted()) {
if (overflow) {
// https://en.wikipedia.org/wiki/Ellipsis
delegate.println("[… output ... ※ ... truncated …]");
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The overflow message should be extracted to a named constant to avoid duplication and improve maintainability.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overflow message contains a mix of different ellipsis characters ('…' and '...'). Consider using consistent ellipsis formatting throughout the message.

Suggested change
delegate.println("[… output ...... truncated …]");
delegate.println("[… output truncated …]");

Copilot uses AI. Check for mistakes.
overflow = false;
}

// take() will block and wait efficiently until a message is available.
Object line = queue.take();
delegate.println(line);
}
} catch (InterruptedException e) {
// We've been interrupted, which is the signal to shut down.
// But before we exit, let's process any remaining messages in the queue.
Object line;
while ((line = queue.poll()) != null) {
delegate.println(line);
}
// Re-set the interrupt flag to be a good citizen.
Thread.currentThread().interrupt();
}
}
}
22 changes: 21 additions & 1 deletion src/dev/enola/be/task/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static java.util.Objects.requireNonNull;

import dev.enola.common.concurrent.Threads;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
Expand All @@ -27,6 +29,19 @@ protected Task(I input) {
/**
* The main logic of the task.
*
* <p>This method is called by the {@link TaskExecutor} when the task is executed. It will run
* in a virtual thread of modern Java. For long-running tasks, please ensure to periodically
* check for interruption via {@link Thread#isInterrupted()} and terminate early if so, by
* throwing {@link InterruptedException}.
*
* <p>Please do not, under any circumstances, use the ancient {@link Thread#yield()} method
* within implementations of this method (or anywhere really anymore, nowadays); as it will
* cause performance degradation by a factor of x100 for no benefit at all anymore on modern
* Java, especially on virtual threads. If you must "simulate work", then please use our {@link
* Threads#sleep(Duration)} utility (instead of the JDK {@link Thread#sleep(long)}), which
* correctly handles interruption. (But this should really should only be required in tests and
* demos, not ever for any real work load.)
*
* @return output, never null (use {@link Empty#INSTANCE}; or {@link Optional}, if needed)
* @throws Exception in case of any failure
*/
Expand Down Expand Up @@ -192,7 +207,12 @@ public void toString(StringBuilder sb) {
sb.append(input.toString()); // TODO Use Jackson?
}

output().ifPresent(o -> sb.append("\noutput: ").append(o.toString())); // TODO Use Jackson?
output().ifPresent(
o -> {
if (o != Empty.INSTANCE)
// TODO Use Jackson?
sb.append("\noutput: ").append(o.toString());
});
failure().ifPresent(t -> sb.append("\nfailure: ").append(t.toString()));

sb.append("\n");
Expand Down
2 changes: 0 additions & 2 deletions src/dev/enola/be/task/TaskCallable.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package dev.enola.be.task;

import java.time.Instant;
import java.util.concurrent.Callable;

class TaskCallable<T> implements Callable<T> {
Expand All @@ -25,7 +24,6 @@ public T call() throws Exception {

} finally {
thread.setName(originalThreadName);
task.endedAt(Instant.now());
}
}
}
70 changes: 47 additions & 23 deletions src/dev/enola/be/task/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.Collection;
import java.util.List;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;

public class TaskExecutor implements AutoCloseable {

private static final Logger LOG = Logger.getLogger(TaskExecutor.class.getName());

// TODO Synthetic "root" task, to which all running tasks are children?
// This could be useful for managing task hierarchies and dependencies.

Expand All @@ -26,6 +30,28 @@ public class TaskExecutor implements AutoCloseable {

private final ExecutorService executor = TaskExecutorServices.newVirtualThreadPerTaskExecutor();

// TODO Can the timeoutScheduler also use virtual threads?
// (Would need to check if ScheduledExecutorService supports that.)
//
// TODO Either way, use LoggingScheduledExecutorService from Enola Commons?
private final ScheduledExecutorService timeoutScheduler =
Executors.newSingleThreadScheduledExecutor();

private static class LoggingFutureTask<V> extends FutureTask<V> {
private final Task<?, V> task;

public LoggingFutureTask(Callable<V> callable, Task<?, V> task) {
super(callable);
this.task = task;
}

@Override
protected void done() {
task.endedAt(Instant.now());
LOG.fine(() -> task.toString());
}
}

private <O> Future<O> future(Task<?, O> task) throws IllegalStateException {
if (tasks.putIfAbsent(task.id(), task) != null)
throw new IllegalStateException("Task already submitted: " + task.id());
Expand All @@ -38,27 +64,19 @@ private <O> Future<O> future(Task<?, O> task) throws IllegalStateException {
throw new IllegalStateException(
"Task " + task.id() + " not PENDING: " + task.status());

Future<O> future;
var timeout = task.timeout();
Callable<O> callable = new TaskCallable<>(task);
if (timeout.isZero() || timeout.isNegative()) {
future = executor.submit(callable);
} else {
Collection<? extends Callable<O>> callables = List.of(callable);
try {
// Nota bene: The risk of toMillis() throwing an ArithmeticException is
// unrealistically low, as that would require a timeout of more than ~292
// million years... :-) But if that ever happens, we want to know about it,
// hence no try/catch here.
var futures = executor.invokeAll(callables, timeout.toMillis(), MILLISECONDS);
future = futures.iterator().next();
} catch (InterruptedException e) {
future = CompletableFuture.failedFuture(e);
Thread.currentThread().interrupt();
}
var futureTask = new LoggingFutureTask<>(callable, task);

executor.execute(futureTask);

var timeout = task.timeout();
if (!timeout.isZero() && !timeout.isNegative()) {
timeoutScheduler.schedule(
() -> futureTask.cancel(true), timeout.toMillis(), MILLISECONDS);
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout cancellation runs asynchronously but there's no handling for potential race conditions if the task completes just as the timeout fires. Consider checking task completion status before cancelling.

Suggested change
() -> futureTask.cancel(true), timeout.toMillis(), MILLISECONDS);
() -> {
if (!futureTask.isDone()) {
futureTask.cancel(true);
}
}, timeout.toMillis(), MILLISECONDS);

Copilot uses AI. Check for mistakes.
}
task.future(future);
return future;

task.future(futureTask);
return futureTask;
}
}

Expand Down Expand Up @@ -104,6 +122,12 @@ public Set<UUID> list() {

@Override
public void close() {
TaskExecutorServices.close(executor);
// Signal to all running tasks, so they can terminate gracefully & fast
for (Task<?, ?> task : tasks.values()) {
task.cancel();
}

executor.close();
timeoutScheduler.close();
}
}
23 changes: 0 additions & 23 deletions src/dev/enola/be/task/TaskExecutorServices.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package dev.enola.be.task;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -41,26 +39,5 @@ static ExecutorService newVirtualThreadPerTaskExecutor() {
return Executors.newThreadPerTaskExecutor(factory);
}

private static final long CLOSE_EXECUTOR_SHUTDOWN_AWAIT_SECONDS = 7;

static void close(ExecutorService executor) {
executor.shutdown();
try {
// Wait for existing tasks to terminate
if (!executor.awaitTermination(CLOSE_EXECUTOR_SHUTDOWN_AWAIT_SECONDS, SECONDS)) {

// Cancel currently executing tasks
executor.shutdownNow();
}

} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();

// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

private TaskExecutorServices() {}
}
16 changes: 16 additions & 0 deletions src/dev/enola/be/task/TaskWithoutInputOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package dev.enola.be.task;

public abstract class TaskWithoutInputOutput extends Task<Empty, Empty> {

protected TaskWithoutInputOutput() {
super(Empty.INSTANCE);
}

@Override
protected final Empty execute() throws Exception {
executeIt();
return Empty.INSTANCE;
}

protected abstract void executeIt() throws Exception;
}
40 changes: 28 additions & 12 deletions src/dev/enola/be/task/demo/LongIncrementingTask.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package dev.enola.be.task.demo;

import dev.enola.be.io.LineWriters;
import dev.enola.be.io.NonBlockingLineWriter;
import dev.enola.be.task.Task;
import dev.enola.be.task.TaskExecutor;
import dev.enola.be.task.demo.LongIncrementingTask.Input;
import dev.enola.be.task.demo.LongIncrementingTask.Output;
import dev.enola.common.concurrent.Threads;
import dev.enola.common.function.CheckedConsumer;
import dev.enola.common.log.JulConfigurer;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Consumer;

public class LongIncrementingTask extends Task<Input, Output> {

record Input(long max, Duration sleep) {}

record Output(long result) {}

private final Consumer<Long> progressConsumer;
private final CheckedConsumer<Long, IOException> progressConsumer;

public LongIncrementingTask(Input input, Consumer<Long> progressConsumer) {
public LongIncrementingTask(Input input, CheckedConsumer<Long, IOException> progressConsumer) {
super(input);
this.progressConsumer = progressConsumer;
}
Expand All @@ -26,7 +31,7 @@ public LongIncrementingTask(Input input, Consumer<Long> progressConsumer) {
protected Output execute() throws Exception {
for (long i = 0; i < input.max; i++) {
progressConsumer.accept(i);
Thread.yield();
// Do *NOT* Thread.yield(); that makes it really horribly slow, by like a factor x100!
if (Thread.currentThread().isInterrupted())
throw new InterruptedException("Task was interrupted");
Threads.sleep(input.sleep);
Expand All @@ -39,7 +44,10 @@ protected Output execute() throws Exception {

private static void simpleLoop(long max, Duration sleep) throws InterruptedException {
var start = Instant.now();
for (long i = 0; i < max; i++) Threads.sleep(sleep);
for (long i = 0; i < max; i++) {
// Do *NOT* Thread.yield(); that makes it really horribly slow, by like a factor x100!
Threads.sleep(sleep);
}
var duration = Duration.between(start, Instant.now());
System.out.println(
"Looped to "
Expand All @@ -51,17 +59,25 @@ private static void simpleLoop(long max, Duration sleep) throws InterruptedExcep
}

public static void main(String[] args) throws InterruptedException {
// Count to max, with 1ms pause between each increment
var max = 10000;
JulConfigurer.configureRootLogger();

// Count to max, with sleep pause between each increment
var max = 1000000L;
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The hardcoded value 1000000L should be extracted to a named constant to improve code readability and maintainability.

Copilot uses AI. Check for mistakes.
var sleep = Duration.ofMillis(0);

var input = new Input(max, sleep);
var task = new LongIncrementingTask(input, System.out::println);
var pumperTask = new NonBlockingLineWriter(13, LineWriters.SYSTEM_OUT);
Copy link

Copilot AI Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 13 for queue capacity should be extracted to a named constant or variable to explain its significance and make it configurable.

Copilot uses AI. Check for mistakes.
var printingTask = new LongIncrementingTask(input, pumperTask::println);
var silentTask = new LongIncrementingTask(input, LineWriters.NOOP::println);

try (var executor = new TaskExecutor()) {
executor.await(task);
System.out.println(task);
}
executor.async(pumperTask);
executor.async(silentTask);

executor.await(printingTask);
silentTask.await();

simpleLoop(max, sleep);
simpleLoop(max, sleep);
}
}
}
Loading