Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions src/dev/enola/be/task/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import dev.enola.common.concurrent.Executors;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -22,20 +24,43 @@ public class TaskExecutor implements AutoCloseable {
// TODO Synthetic "root" task, to which all running tasks are children?
// This could be useful for managing task hierarchies and dependencies.

// TODO Eviction policy of completed tasks?! As-is, this leaks memory...
// E.g. periodically scan the map and remove tasks that are in a terminal state.
// Persist them first, so that get() can still find them later; with separate
// eviction policy.
// This map has a basic time-based eviction policy; see constructor.
// A more sophisticated implementation could persist completed tasks, so that get()
// can still find them later, with a separate eviction policy for that persistent store.
private final Map<UUID, Task<?, ?>> tasks = new ConcurrentHashMap<>();

private final ExecutorService executor = TaskExecutorServices.newVirtualThreadPerTaskExecutor();

// TODO Can the timeoutScheduler also use virtual threads?
// (Would need to check if ScheduledExecutorService supports that.)
// Nota bene: In *THEORY* we should *NEVER* have *ANY* uncaught exceptions from Task,
// because any exception thrown by the task's `execute()` method would be caught and
// wrapped in an ExecutionException by the Future returned by ExecutorService.submit().
//
// TODO Either way, use LoggingScheduledExecutorService from Enola Commons?
// But in practice, who knows what the future holds, so we better log them just in case;
// just because "swallowed" lost exceptions are seriously the worst kind of bugs to diagnose!
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(LOG);

private final ScheduledExecutorService timeoutScheduler =
Executors.newSingleThreadScheduledExecutor();
Executors.newSingleThreadScheduledExecutor("TaskExecutor-Timeout", LOG);

private final ScheduledExecutorService cleanupScheduler =
Executors.newSingleThreadScheduledExecutor("TaskExecutor-Cleanup", LOG);

public TaskExecutor(Duration completedTaskEvictionInterval) {
Comment thread
vorburger marked this conversation as resolved.
if (completedTaskEvictionInterval == null) {
throw new IllegalArgumentException("completedTaskEvictionInterval must not be null");
Comment thread
vorburger marked this conversation as resolved.
}
if (completedTaskEvictionInterval.isNegative() || completedTaskEvictionInterval.isZero()) {
throw new IllegalArgumentException("completedTaskEvictionInterval must be positive");
}
Comment on lines +46 to +52
Copy link

Copilot AI Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using Objects.requireNonNull() for the null check on line 47-48 for consistency with the null validation pattern used elsewhere in the codebase.

Copilot uses AI. Check for mistakes.
var m = completedTaskEvictionInterval.toMillis();
Comment thread
vorburger marked this conversation as resolved.
cleanupScheduler.scheduleAtFixedRate(this::evictCompletedTasks, m, m, MILLISECONDS);
Comment thread
vorburger marked this conversation as resolved.
}

public TaskExecutor() {
this(Duration.ofHours(1));
}

private void evictCompletedTasks() {
tasks.values().removeIf(task -> task.status().isTerminal());
Comment thread
vorburger marked this conversation as resolved.
}
Comment on lines +61 to +63
Copy link

Copilot AI Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The evictCompletedTasks method iterates over all tasks to find completed ones. For large task maps, this could be inefficient. Consider maintaining a separate queue of completed tasks or using a time-based eviction strategy that tracks completion timestamps.

Copilot uses AI. Check for mistakes.

private static class LoggingFutureTask<V> extends FutureTask<V> {
private final Task<?, V> task;
Expand Down Expand Up @@ -127,7 +152,8 @@ public void close() {
task.cancel();
}

executor.close();
cleanupScheduler.close();
timeoutScheduler.close();
executor.close();
Comment thread
vorburger marked this conversation as resolved.
Comment on lines +155 to +157
Copy link

Copilot AI Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executor services should be shut down in reverse order of dependency to avoid potential issues. Since the executor might have tasks that could be cancelled by the timeout scheduler, consider closing the executor first, then the timeout scheduler, and finally the cleanup scheduler.

Suggested change
cleanupScheduler.close();
timeoutScheduler.close();
executor.close();
executor.close();
timeoutScheduler.close();
cleanupScheduler.close();

Copilot uses AI. Check for mistakes.
}
}
43 changes: 0 additions & 43 deletions src/dev/enola/be/task/TaskExecutorServices.java

This file was deleted.

56 changes: 56 additions & 0 deletions src/dev/enola/common/concurrent/Executors.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package dev.enola.common.concurrent;

import static dev.enola.common.concurrent.LoggingThreadUncaughtExceptionHandler.toLogger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Logger;

public final class Executors {

public static ScheduledExecutorService newSingleThreadScheduledExecutor(
String namePrefix, Logger logger) {
var tf = createThreadFactory(namePrefix, logger);
return java.util.concurrent.Executors.newSingleThreadScheduledExecutor(tf);
}

public static ExecutorService newVirtualThreadPerTaskExecutor(
String namePrefix, Logger logger) {
var tf = createVirtualThreadFactory(namePrefix, logger);
return java.util.concurrent.Executors.newThreadPerTaskExecutor(tf);
}

public static ExecutorService newVirtualThreadPerTaskExecutor(Logger logger) {
var tf = createVirtualThreadFactory(logger);
return java.util.concurrent.Executors.newThreadPerTaskExecutor(tf);
}

private static ThreadFactory createThreadFactory(String namePrefix, Logger logger) {
return Thread.ofPlatform()
.name(namePrefix, 1)
.uncaughtExceptionHandler(toLogger(logger))
.daemon(true)
// TODO new ContextAwareThreadFactory() how-to?
// TODO .inheritInheritableThreadLocals(true) ?
.factory();
}

private static ThreadFactory createVirtualThreadFactory(String namePrefix, Logger logger) {
var builder = Thread.ofVirtual();
if (namePrefix != null) {
builder = builder.name(namePrefix, 1);
}
Comment thread
vorburger marked this conversation as resolved.
return builder.uncaughtExceptionHandler(toLogger(logger))
// NB: Virtual threads are always daemon threads, so no: .setDaemon(true)
// TODO new ContextAwareThreadFactory() how-to?
// TODO .inheritInheritableThreadLocals(true) ?
.factory();
}

private static ThreadFactory createVirtualThreadFactory(Logger logger) {
return createVirtualThreadFactory(null, logger);
}

private Executors() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dev.enola.common.concurrent;

import static java.util.Objects.requireNonNull;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.logging.Level;
import java.util.logging.Logger;

class LoggingThreadUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {

private final Logger logger;

private LoggingThreadUncaughtExceptionHandler(Logger logger) {
this.logger = requireNonNull(logger, "logger");
}

/** Factory method to obtain an instance of this bound to the passed JUL Logger. */
public static UncaughtExceptionHandler toLogger(Logger logger) {
return new LoggingThreadUncaughtExceptionHandler(logger);
}

@Override
public void uncaughtException(Thread thread, Throwable throwable) {
logger.log(
Level.SEVERE, "Uncaught exception in thread '" + thread.getName() + "'", throwable);
}
}