Skip to content

[runtime/tokio] support for colocated tasks#3548

Open
andresilva wants to merge 4 commits into
mainfrom
andre/runtime-spawn-colocated
Open

[runtime/tokio] support for colocated tasks#3548
andresilva wants to merge 4 commits into
mainfrom
andre/runtime-spawn-colocated

Conversation

@andresilva
Copy link
Copy Markdown
Member

@andresilva andresilva commented Apr 7, 2026

This PR extends the runtime spawner with a new colocated() builder method that allows tasks to be co-located on the same OS thread as their dedicated ancestor. When a task is spawned as dedicated, it now runs inside a tokio::task::LocalSet. Subsequent colocated() spawns from that context use tokio::task::spawn_local to land on the same thread, eliminating cross-thread synchronization overhead and enabling cache-friendly data sharing between related tasks. If there is no dedicated ancestor, colocated() falls back to the shared executor.

The implementation uses a boolean flag (on_dedicated_thread) on Context rather than carrying an explicit LocalSet reference. This is necessary because LocalSet is !Send and Context must be Send + Sync. tokio::task::spawn_local already targets the innermost active LocalSet on the current thread, so the flag just gates whether calling it is valid. Colocation always targets the closest dedicated ancestor, if a colocated task spawns a new dedicated child, that child starts a fresh colocation chain on its own thread. The chain also breaks when a task is spawned onto the shared pool, i.e. a colocated grandchild from a shared parent will not return to the original dedicated thread.

Related #3537.

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented Apr 7, 2026

Deploying monorepo with  Cloudflare Pages  Cloudflare Pages

Latest commit: 467a9e8
Status: ✅  Deploy successful!
Preview URL: https://f24f9091.monorepo-eu0.pages.dev
Branch Preview URL: https://andre-runtime-spawn-colocate.monorepo-eu0.pages.dev

View logs

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented Apr 7, 2026

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
✅ Deployment successful!
View logs
commonware-mcp 467a9e8 Apr 23 2026, 01:20 PM

Comment thread runtime/src/tokio/runtime.rs Outdated
}
});
} else if matches!(past, Execution::Colocated) && parent_on_dedicated {
tokio::task::spawn_local(f);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio::task::spawn_local targets the innermost active LocalSet on the current thread, which is always the one created by the dedicated ancestor. This avoids carrying an explicit LocalSet reference on Context, which is not possible because LocalSet is !Send and Context must be Send + Sync.

@andresilva andresilva marked this pull request as ready for review April 7, 2026 08:52
@andresilva andresilva moved this to Ready for Review in Tracker Apr 7, 2026
Comment thread runtime/src/telemetry/metrics/task.rs Outdated
SharedBlocking,
/// Task runs on a dedicated thread.
Dedicated,
/// Task is co-located on the same thread as its dedicated ancestor. Falls
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should instead error/panic on this rather than falling back? At least, we should do that on deterministic.

Copy link
Copy Markdown
Member Author

@andresilva andresilva Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the context has no way of knowing if its parent (or any ancestor) is running as dedicated, so there's no obvious way to avoid panicking at the call site. colocated() is more of a local hint, but the caller doesn't control where it ends up in the spawn hierarchy, and the same code might be used in contexts with or without a dedicated ancestor. Panicking would make colocated() unusable in generic code.

Maybe this should be made more explicit in the documentation?

@patrick-ogrady
Copy link
Copy Markdown
Contributor

Because on_colocated_task can be copied in context, the LLMs like this type of approach instead:

diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs
index db6197679..09a7fc6a3 100644
--- a/runtime/src/lib.rs
+++ b/runtime/src/lib.rs
@@ -3894,26 +3894,55 @@ mod tests {
         });
     }
 
+    #[test]
+    fn test_tokio_spawn_colocated_uses_current_dedicated_thread_for_shared_clone() {
+        // Verify that colocation depends on where a clone is used, not where it
+        // was created. A clone captured on the shared root should still
+        // colocate when it is later reused from a dedicated task.
+        let executor = tokio::Runner::default();
+        executor.start(|context| async move {
+            let reused = context.clone();
+            let (dedicated_thread, child_thread) = context
+                .dedicated()
+                .spawn(move |_| async move {
+                    let dedicated_thread = std::thread::current().id();
+                    let child_thread = reused
+                        .colocated()
+                        .spawn(|_| async move { std::thread::current().id() })
+                        .await
+                        .unwrap();
+                    (dedicated_thread, child_thread)
+                })
+                .await
+                .unwrap();
+            assert_eq!(dedicated_thread, child_thread);
+        });
+    }
+
     #[test]
     fn test_tokio_spawn_colocated_breaks_on_shared() {
-        // Verify that a shared spawn breaks the colocation chain: a colocated
-        // grandchild spawned from a shared child must NOT land back on the
-        // dedicated thread.
+        // Verify that a dedicated clone reused from a shared child does not
+        // inherit its original dedicated affinity. Colocation should fall back
+        // to shared work instead of trying to `spawn_local` from outside the
+        // dedicated LocalSet.
         //
         // dedicated (thread A)
         //   +-- shared child (thread B, leaves dedicated)
-        //         +-- colocated grandchild (no dedicated ancestor, stays on shared)
+        //         +-- reused dedicated clone
+        //               +-- colocated grandchild (no dedicated ancestor, stays on shared)
         let executor = tokio::Runner::default();
         executor.start(|context| async move {
             let handle = context.dedicated().spawn(|context| async move {
                 let dedicated_thread = std::thread::current().id();
+                let reused = context.clone();
 
                 // Shared child leaves the dedicated thread
                 let (shared_thread, grandchild_thread) = context
-                    .clone()
-                    .spawn(|context| async move {
-                        // Colocated from here has no dedicated ancestor
-                        let grandchild_thread = context
+                    .shared(false)
+                    .spawn(move |_| async move {
+                        // The reused clone must observe the shared execution
+                        // site, not the dedicated thread where it was created.
+                        let grandchild_thread = reused
                             .colocated()
                             .spawn(|_| async move { std::thread::current().id() })
                             .await
diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs
index 8d5459495..93f9b6305 100644
--- a/runtime/src/tokio/runtime.rs
+++ b/runtime/src/tokio/runtime.rs
@@ -87,6 +87,33 @@ impl Metrics {
     }
 }
 
+thread_local! {
+    /// Marks code currently executing on one of this runtime's dedicated threads.
+    static ON_DEDICATED_THREAD: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
+}
+
+/// Guards the dedicated-thread marker for the lifetime of a dedicated thread run loop.
+struct DedicatedThreadGuard {
+    previous: bool,
+}
+
+impl DedicatedThreadGuard {
+    fn enter() -> Self {
+        let previous = ON_DEDICATED_THREAD.with(|marker| marker.replace(true));
+        Self { previous }
+    }
+}
+
+impl Drop for DedicatedThreadGuard {
+    fn drop(&mut self) {
+        ON_DEDICATED_THREAD.with(|marker| marker.set(self.previous));
+    }
+}
+
+fn on_dedicated_thread() -> bool {
+    ON_DEDICATED_THREAD.with(|marker| marker.get())
+}
+
 #[derive(Clone, Debug)]
 pub struct NetworkConfig {
     /// If Some, explicitly sets TCP_NODELAY on the socket.
@@ -479,7 +506,6 @@ impl crate::Runner for Runner {
             tree: Tree::root(),
             execution: Execution::default(),
             instrumented: false,
-            on_dedicated_thread: false,
         };
         let output = executor.runtime.block_on(panicked.interrupt(f(context)));
         gauge.dec();
@@ -519,7 +545,6 @@ pub struct Context {
     tree: Arc<Tree>,
     execution: Execution,
     instrumented: bool,
-    on_dedicated_thread: bool,
 }
 
 impl Clone for Context {
@@ -537,7 +562,6 @@ impl Clone for Context {
             tree: child,
             execution: Execution::default(),
             instrumented: false,
-            on_dedicated_thread: self.on_dedicated_thread,
         }
     }
 }
@@ -586,14 +610,9 @@ impl crate::Spawner for Context {
         self.execution = Execution::default();
         self.instrumented = false;
 
-        // The child runs on a dedicated thread if it is spawned as dedicated
-        // (new thread) or colocated onto an existing dedicated thread.
-        let parent_on_dedicated = self.on_dedicated_thread;
-        self.on_dedicated_thread = match past {
-            Execution::Dedicated => true,
-            Execution::Colocated if parent_on_dedicated => true,
-            _ => false,
-        };
+        // Colocation depends on where this spawn call executes, not where the
+        // context was created or cloned.
+        let parent_on_dedicated = on_dedicated_thread();
 
         let (child, aborted) = Tree::child(&parent);
         if aborted {
@@ -624,6 +643,7 @@ impl crate::Spawner for Context {
                 // Ensure the task can access the tokio runtime
                 let handle = executor.runtime.handle().clone();
                 move || {
+                    let _guard = DedicatedThreadGuard::enter();
                     let local = tokio::task::LocalSet::new();
                     handle.block_on(local.run_until(f));
                 }

@andresilva
Copy link
Copy Markdown
Member Author

andresilva commented Apr 8, 2026

I don't get why this is needed? What's the problem that on_dedicated_thread gets copied in context, it shouldn't change the meaning because copying doesn't change parenting.

Edit: okay I read the test now and see the issue, will see if there's a way around it without the thread local.

@andresilva
Copy link
Copy Markdown
Member Author

@patrick-ogrady I think this issue reflects a misuse (or more precisely "unintended") of context, the same thing happens with the supervision tree if you reuse a context from another "scope" and it can also lead to "surprising" behaviors (again, it might not be surprising at all, it might be intentional). I think for consistency we shouldn't be using thread-locals here, colocation (like parenting) refers to the context that is being spawned on not the "scope" you're in.

@andresilva
Copy link
Copy Markdown
Member Author

@andresilva andresilva force-pushed the andre/runtime-spawn-colocated branch from 586c600 to 65e2216 Compare April 22, 2026 11:31
@codecov
Copy link
Copy Markdown

codecov Bot commented May 20, 2026

Codecov Report

❌ Patch coverage is 97.89474% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 95.82%. Comparing base (2a7dd42) to head (467a9e8).
⚠️ Report is 98 commits behind head on main.

Files with missing lines Patch % Lines
runtime/src/tokio/dedicated.rs 94.73% 5 Missing and 2 partials ⚠️
runtime/src/utils/cell.rs 0.00% 3 Missing ⚠️
@@            Coverage Diff             @@
##             main    #3548      +/-   ##
==========================================
- Coverage   95.87%   95.82%   -0.05%     
==========================================
  Files         442      443       +1     
  Lines      172121   173954    +1833     
  Branches     4010     4084      +74     
==========================================
+ Hits       165017   166697    +1680     
- Misses       5840     5955     +115     
- Partials     1264     1302      +38     
Files with missing lines Coverage Δ
runtime/src/deterministic.rs 96.31% <100.00%> (+0.09%) ⬆️
runtime/src/lib.rs 98.09% <100.00%> (+0.46%) ⬆️
runtime/src/telemetry/metrics/task.rs 100.00% <100.00%> (ø)
runtime/src/tokio/runtime.rs 85.46% <100.00%> (+0.60%) ⬆️
runtime/src/utils/mod.rs 97.58% <ø> (ø)
runtime/src/utils/supervision.rs 100.00% <100.00%> (ø)
runtime/src/utils/cell.rs 76.92% <0.00%> (-1.51%) ⬇️
runtime/src/tokio/dedicated.rs 94.73% <94.73%> (ø)

... and 23 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2a7dd42...467a9e8. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Ready for Review

Development

Successfully merging this pull request may close these issues.

2 participants