feature: Cross-platform Task primitive for cancellable background work#553
feature: Cross-platform Task primitive for cancellable background work#553xerial wants to merge 10 commits into
Conversation
Adds wvlet.uni.concurrent.Task, the cross-platform "run a unit of work on a background worker that can be cooperatively cancelled and progress-polled" primitive requested in #552. The shape stays below Rx so a future SqlConnector body (DuckDB pending-execute, Trino poll loop, Snowflake submit/poll) plugs in without forcing an effect monad on callers. - `Task` has no `[A]` result type; body is `TaskContext => Unit`. Value handoff goes through caller-supplied Rx primitives, matching uni's "side-effects managed outside the interface" direction. - `await(): Unit` blocks on JVM/Native; on Scala.js it throws so callers reach for `awaitRx: Rx[Unit]`, which is the universal path. - JVM/Native back the body with a daemon thread + `Thread.interrupt` on cancel so blocking JDK calls (`Thread.sleep`, IO) unwedge. Scala.js queues the body as a microtask — same API, cooperative scheduling. Feasibility validated across all four platforms in this PR (JVM, JS in Node, JS in browser via the same JS impl, Scala Native). See plans/2026-05-18-cross-platform-thread.md for the design rationale. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a cross-platform Task primitive for background work across JVM, Scala Native, and Scala.js, including a design document, core API traits, and platform-specific implementations. The feedback suggests enhancing type safety by parameterizing the progress type instead of using Any. For the JVM implementation, it is recommended to use a shared thread pool for efficiency. Additionally, a change to the cancel() logic is proposed to allow re-triggering the interruption signal if a task remains in the cancelling state.
| /** | ||
| * Caller-defined progress payload. `Task` provides no semantics beyond "latest snapshot wins." | ||
| */ | ||
| type Progress = Any |
There was a problem hiding this comment.
Using type Progress = Any forces users to use manual downcasting when consuming progress updates, which is error-prone and less idiomatic in a type-safe library like uni. Since the design doc already notes a preference for a typed approach (e.g., Task[+P]), it would be better to parameterize Task and TaskContext with a progress type P now. This would allow consumers to work with specific types (like QueryStats) directly through the progress and progressStream APIs.
|
|
||
| private[concurrent] object taskCompat: | ||
|
|
||
| private lazy val threadFactory = ThreadUtil.newDaemonThreadFactory("uni-task") |
There was a problem hiding this comment.
Creating a new Thread for every task via threadFactory.newThread is inefficient for high-frequency background work. Consider using a shared ExecutorService (e.g., Executors.newCachedThreadPool) to manage and reuse threads. This improvement is also suggested in the design doc under 'Platform implementations' for the JVM.
| final override def cancel(): Unit = | ||
| if cancelFlag.compareAndSet(false, true) then | ||
| stateRef.compareAndSet(Task.State.Running, Task.State.Cancelling) | ||
| onCancelRequested() |
There was a problem hiding this comment.
The cancel() implementation is strictly one-shot. If a task body catches InterruptedException but continues execution (for example, if it has multiple sequential blocking calls), subsequent calls to cancel() will not re-trigger onCancelRequested(). Consider allowing onCancelRequested() to be called again if the task is still in the Cancelling state, ensuring that the interruption signal can be re-sent if the first one was insufficient.
final override def cancel(): Unit =
if cancelFlag.compareAndSet(false, true) then
stateRef.compareAndSet(Task.State.Running, Task.State.Cancelling)
onCancelRequested()
else if state == Task.State.Cancelling then
onCancelRequested()Per review: - Move from new wvlet.uni.concurrent (uni-core) to wvlet.uni.control (uni), alongside Retry / CircuitBreaker / RateLimiter / Guard / Resource — the existing home for lifecycle / reliability primitives. - Consolidate Task.scala + TaskImpl.scala into a single file, matching Retry.scala / CircuitBreaker.scala convention. - Drop the progress API entirely. Bodies publish progress / partial results / anything else through any existing Rx primitive (RxVar, RxQueue, RxDeferred) — the same "side-effects managed outside the interface" rule that already justified no `[A]` result type. Removes Task.Progress (type Any), progressRef, progressVar, progressStream, reportProgress. - Drop Task.State.Cancelling — its own documentation acknowledged the transition was unreliable as an observable state. isCancelled covers "cancel requested"; the terminal state is the source of truth. - Memoise awaitRx (was rebuilding the Rx chain per call). - Inline `start` into taskCompat.run (was an unnecessary indirection). All tests pass: JVM 9/9, JS 8/8, Native 9/9. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces wvlet.uni.control.Task, a cross-platform primitive for managing background work with cooperative cancellation across JVM, Scala Native, and Scala.js. The implementation provides a unified API for lifecycle management, though the design document needs updates to reflect the removal of the progress API and generic result types. Feedback also suggests optimizing performance on JVM and Native platforms by utilizing shared thread pools instead of spawning new threads for every task.
| ## Platform implementations | ||
|
|
||
| ### JVM (`uni/.jvm/.../bgTaskCompat.scala`) | ||
|
|
||
| - Worker: `Thread` from `ThreadUtil.newDaemonThreadFactory("bg-task")`. | ||
| (Optionally back by a shared `Executor` for cheap reuse — the | ||
| blocking scheduler's cached pool is the obvious candidate.) | ||
| - Cancel signal: `AtomicBoolean` read by `isCancelled`; `cancel()` | ||
| also calls `Thread.interrupt()` so blocking JDK calls | ||
| (`InputStream.read`, `Socket.connect`, `Thread.sleep`) unblock. | ||
| - Progress: `AtomicReference[Progress]` for `progress`; | ||
| `RxQueue[Progress]` (existing) for `progressStream`. | ||
| - Await: `CountDownLatch(1)`, released on terminal transition. | ||
|
|
There was a problem hiding this comment.
This design document appears to be partially out of sync with the final implementation. It contains several references to features that were decided against, which could be confusing for future readers.
Specifically:
- Progress API: Sections for JVM, Scala Native, and Scala.js (e.g., lines 128-129, 139, 170-172) and the 'Open questions' section (lines 238-242) still describe a built-in progress API, even though the 'Naming, shape, and package' section correctly states it was removed.
- Generic Result Type
[A]: Several places (e.g., lines 207, 241, 262) mention a generic result type[A]forTaskandawaitRx, but the final API usesUnit.
It would be beneficial to update the document to consistently reflect the final API surface (Task with no result type and no progress API).
|
|
||
| private[control] object taskCompat: | ||
|
|
||
| private lazy val threadFactory = ThreadUtil.newDaemonThreadFactory("uni-task") |
There was a problem hiding this comment.
Creating a new thread for each Task can be inefficient, especially if many short-lived tasks are created. As noted in the design document, using a shared ExecutorService with a thread pool would be more performant by reusing threads.
Consider using a shared ExecutorService, for example, one created with java.util.concurrent.Executors.newCachedThreadPool(threadFactory), to manage worker threads.
| */ | ||
| private[control] object taskCompat: | ||
|
|
||
| private lazy val threadFactory = ThreadUtil.newDaemonThreadFactory("uni-task") |
|
Thanks for the review!
|
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…r_threads
Adds two API surfaces on top of the v1 Task:
1. cancel(reason: String = "") — the reason flows into the
InterruptedException that checkCancelled throws and that await()/
awaitRx surface, so it shows up in stack traces and Rx error events.
2. Task.register("id") { body } + Task.runRegistered("id") — registry-
based bodies. The driving requirement (#552, wvlet cross-platform
query runner) needs blocking await() on Node, which requires running
the body in a separate isolate. The implementation spawns a
worker_threads worker, dynamic-imports the same Scala.js bundle via
import.meta.url (which re-runs module init in the worker, re-
populating the registry there), and blocks the main thread on
Atomics.wait over a SharedArrayBuffer. Cancel is a CAS into the
SAB's cancel-flag word.
State.Cancelling was dropped at the same time (it was unreliable as an
observable signal — a racing terminal `set` from the body could win).
isCancelled covers "cancel requested"; the terminal state is the truth.
Non-obvious traps captured in
adr/2026-05-18-node-task-worker-threads.md:
- import.meta.url, not process.argv[1] (sbt-jsenv-nodejs leaves argv[1]
empty)
- Test bundle's Bridge.start() throws in worker isolates; the worker
bootstrap tolerates the import error if globalThis.__uniTaskInvoke
was set first
- @JSExportTopLevel `val` triggers eager init at module load; the same
on a `def` doesn't — the eager bootstrap pattern is what makes
registration reliable
- js.Dynamic.global.updateDynamic("foo")(v) is rewritten by the Scala.js
linker as a bare identifier assignment (`foo = v`), which throws
ReferenceError in strict-mode ESM. Use Object.defineProperty via
js.eval("globalThis") instead.
Tests: JVM 9/9, JS Node + browser 13/13 (incl. 4 NodeWorkerTaskTest
exercising real blocking-await), Native 10/10 — all four platforms
green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
/gemini review |
The previous \`js.Dynamic.global.process\` access compiled to a bare
\`process\` identifier in \`ModuleKind.NoModule\` (the \`uni-dom-test\`
bundle), which threw \`ReferenceError\` in JSDOM's script sandbox at
module init (taskCompat's eager _bundleBootstrap val). Switching to
\`js.eval("typeof process !== 'undefined' && …")\` — \`typeof\` is the
canonical undeclared-safe check and works in both NoModule and ESM
bundles.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request implements wvlet.uni.control.Task, a cross-platform background work primitive supporting JVM, Scala Native, and Scala.js. It includes a Node.js implementation using worker_threads and SharedArrayBuffer to allow blocking await() calls. Reviewer feedback correctly identified a need to align the await() contract by ensuring InterruptedException is thrown upon cancellation and recommended utilizing Atomics.store for state transitions in the shared buffer to guarantee memory visibility across isolates.
| case e: Throwable if cancelFlag.get() => | ||
| // Cancel was requested before the body finished — classify the throw as Cancelled even | ||
| // if the exception type would otherwise be Failed (e.g. a JDBC InterruptedIOException). | ||
| stateRef.set(Task.State.Cancelled) | ||
| failure = e | ||
| completion.failure(e) |
There was a problem hiding this comment.
The current implementation of runBody in TaskImpl violates the Task.await() contract when a task is cancelled. The trait documentation states that await() should throw an InterruptedException on Cancelled, but runBody captures the original exception e and rethrows it directly (via failure in JvmTaskImpl/NativeTaskImpl). This means if a body throws a RuntimeException while cancelFlag is set, await() will throw that RuntimeException instead of an InterruptedException.
Additionally, awaitRx should be consistent with NodeWorkerTask (in taskCompat.scala), which always surfaces an InterruptedException for the Cancelled state. Wrapping the original exception as the cause of the InterruptedException preserves the diagnostic information while adhering to the API contract.
case e: Throwable if cancelFlag.get() =>
// Cancel was requested before the body finished — classify the throw as Cancelled even
// if the exception type would otherwise be Failed (e.g. a JDBC InterruptedIOException).
stateRef.set(Task.State.Cancelled)
val ex = e match {
case ie: InterruptedException => ie
case _ =>
val msg = if cancelReason == null || cancelReason.isEmpty then "Task cancelled" else cancelReason
val ie = new InterruptedException(msg)
ie.initCause(e)
ie
}
failure = ex
completion.failure(ex)|
|
||
| // `Atomics.*` are accessed via applyDynamic to avoid clashing with `java.lang.Object.wait` / | ||
| // `notify` on Scala 3 — same trick `NodeSyncHttpChannel` uses. | ||
| private def atomicsLoad(view: Int32Array, index: Int): Int = js |
There was a problem hiding this comment.
Add an atomicsStore helper to ensure memory visibility when updating the task state in the SharedArrayBuffer. While plain assignments to SharedArrayBuffer views are eventually visible, using Atomics.store provides the necessary memory barrier to ensure the parent thread (which is blocking on Atomics.wait) observes the update correctly.
private def atomicsStore(view: Int32Array, index: Int, value: Int): Unit = js
.Dynamic
.global
.Atomics
.applyDynamic("store")(view, index, value)
| try | ||
| val body = Task.lookup(taskId) | ||
| body(ctx) | ||
| stateView(0) = StateSuccess |
| stateView(0) = StateSuccess | ||
| catch | ||
| case _: InterruptedException if ctx.isCancelled => | ||
| stateView(0) = StateCancelled |
| else | ||
| e.getClass.getName | ||
| writeError(msg) | ||
| stateView(0) = StateFailed |
| if s == StateRunning then | ||
| // Worker crashed before writing — synthesise a Failed terminal state. | ||
| writeError("Worker exited without producing a result") | ||
| stateView(0) = StateFailed |
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ics.store Two issues flagged in PR review: 1. (high) `Task.await()` contract says "throws InterruptedException on Cancelled" but `runBody` rethrew whatever the body threw. A body throwing RuntimeException while cancel was set surfaced as RuntimeException, not InterruptedException. Fix: on Cancelled, always wrap as InterruptedException (carrying the cancel reason), with the body's original throw attached as `initCause` for debugging. 2. (medium) Use `Atomics.store` for SAB state-word writes. Plain `view(0) = X` assignments to a SharedArrayBuffer-backed typed array are visible across threads in practice on V8/SpiderMonkey, but the spec only guarantees that visibility for `Atomics.*` accesses — `Atomics.store` provides the matching memory barrier for the parent's `Atomics.wait`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Addressed in 2b9e6af:
All 33 tests still pass across JVM/JS/Native + domTest. |
`Task` is "a handle to a running task". The id→body lookup is a
separate concern — templates of work that can be looked up by id.
Mixing them on the Task companion conflated two different things;
extracting `TaskRegistry` makes the separation explicit.
User-facing change:
- `Task.register(id) { body }` → `TaskRegistry.register(id) { body }`
- `Task.runRegistered(id)` unchanged (delegates internally).
Per offline review feedback.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously TaskRegistry was an object — a process-wide singleton. That
made it impossible for tests to verify registration / lookup behaviour
without leaking into each other or into the production-default
registry.
Now:
- `class TaskRegistry` — instantiable; each instance is isolated. Tests
do `val r = TaskRegistry(); r.register(...)`.
- `object TaskRegistry.default` — the process-wide singleton that
`Task.runRegistered` uses on every platform (and that the Node
worker thread reads from in its own isolate after bundle re-import).
- `TaskRegistry.register(id) { ... }` — companion shortcut for
`TaskRegistry.default.register(...)`, so module-init registrations
keep their current ergonomic form.
- Added `isRegistered(id): Boolean` so tests can verify state without
relying on lookup throwing.
Plus a `TaskFeasibilityTest` covering the isolation property
explicitly.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…dinals, etc. Reviewers (efficiency + code-quality + reuse agents) found: - **Efficiency regression**: \`NodeWorkerTask.awaitRx\` allocated a fresh \`Rx.future(...)\` on each call because the class doesn't extend \`TaskImpl\` (state lives in the SAB). Fix: re-declare \`cachedAwaitRx\` locally so \`task.awaitRx eq task.awaitRx\`. - **Sync-drift risk**: JS \`State*\` int constants were hand-numbered, decoupled from \`Task.State\` enum ordinals. Reordering the enum would have silently desynced the worker writer from the parent reader. Pin via \`Task.State.<X>.ordinal\`. - **Hot path**: \`isNode\` was a \`def\` re-evaluating \`js.eval\` per call; \`nodeMainScriptUrl\` was a \`def\` resolving \`import.meta.url\` per \`runRegistered\` call. Both are pure module-level constants; switched to \`lazy val\`. - **Test consistency**: \`NodeWorkerTaskTest.isNode\` used the buggy \`js.isUndefined(js.Dynamic.global.process)\` form (would throw \`ReferenceError\` under JSDOM/NoModule). Switched to the typeof form that \`taskCompat.isNode\` already uses. - **Test strength**: \`double cancel is idempotent\` only asserted \`isDone\`. Strengthened: the first cancel's reason wins, the second doesn't flip the state. Deferred to a follow-up PR (real cleanup but cross-cuts a recently- ADR'd file): - Extract \`JsRuntime\` (\`isNode\` + \`workerThreads\` loader + \`Atomics\` wrappers + \`SharedArrayBufferCtor\`) shared between \`taskCompat.js\` and \`NodeSyncHttpChannel\` — would also fix \`NodeSyncHttpChannel.isNode\`'s JSDOM \`ReferenceError\` bug. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Adds
wvlet.uni.control.Task, a cross-platform primitive for "run a unit of work on a background worker that can be cooperatively cancelled and awaited" — the requirement from #552. Lands the full design end-to-end on JVM, Scala.js (Node + browser), and Scala Native in one PR so the design itself is verified.Sits in
wvlet.uni.controlalongsideRetry,CircuitBreaker,RateLimiter,Guard,Resource— the existing home for lifecycle / reliability primitives.Two API surfaces
Task.run { body }— closure-based, works on all platforms.await()blocks on JVM/Native; throwsUnsupportedOperationExceptionon Scala.js (would deadlock the event loop) — useawaitRx.Task.register("id") { body }+Task.runRegistered("id")— registry-based. On Node, the body runs in aworker_threadsworker so the main thread canAtomics.wait— blockingawait()works on Node. Required for the wvlet cross-platform query-runner use case. On JVM/Native this is a thin alias overrun; on browser it falls back to the microtask path (no main-threadAtomics.wait).Lifecycle
[A]result type, no progress API. Body isTaskContext => Unit. Bodies publish anything (progress, partial results, terminal values) through any existing Rx primitive (RxVar,RxQueue,RxDeferred). Same "side-effects managed outside the interface" rule that already justified no result type.cancel(reason: String = "")— reason flows into theInterruptedExceptionthatcheckCancelledthrows and thatawait()/awaitRxsurface.Running | Succeeded | Failed | Cancelled— noCancelling(unreliable observable; a racing terminalsetcould win).Node worker_threads — non-obvious bits
Captured in
adr/2026-05-18-node-task-worker-threads.md:js.\import`.meta.url, notprocess.argv[1]` (sbt-jsenv-nodejs leaves argv[1] empty).Bridge.start()throws in worker isolates becausescalajsComis undefined — the worker bootstrap tolerates the import error ifglobalThis.__uniTaskInvokegot set first (eagerly via an@JSExportTopLevel val).js.Dynamic.global.updateDynamic("foo")(v)is rewritten by the Scala.js linker asfoo = v, which throwsReferenceErrorin strict-mode ESM. UseObject.definePropertyviajs.eval("globalThis")instead.Task.register.Test plan
uniJVM/testOnly *TaskFeasibilityTest *JvmTaskTest— 9/9 (lifecycle, blockingawait,Thread.interrupt-driven cancel of a sleeping body,cancel(reason))uniJS/testOnly *TaskFeasibilityTest *JsTaskTest *NodeWorkerTaskTest— 13/13 (microtask lifecycle, browser-sideawaitthrows, 4 NodeWorkerTaskTest covering real blocking-await: success, body-exception, cancel-with-reason, missing-registration)uniNative/testOnly *TaskFeasibilityTest *NativeTaskTest— 10/10 (lifecycle + JVM-shape blocking/interrupt tests)scalafmtAllcleanprojectJVM/testclean (no regressions)Design doc:
plans/2026-05-18-cross-platform-thread.md. ADR:adr/2026-05-18-node-task-worker-threads.md.🤖 Generated with Claude Code