@@ -1693,6 +1693,69 @@ mod tests {
16931693 } ) ;
16941694 }
16951695
1696+ fn test_spawn_colocated < R : Runner > ( runner : R )
1697+ where
1698+ R :: Context : Spawner ,
1699+ {
1700+ runner. start ( |context| async move {
1701+ // Colocated child from a dedicated parent
1702+ let handle = context. dedicated ( ) . spawn ( |context| async move {
1703+ let handle = context. colocated ( ) . spawn ( |_| async move { 42 } ) ;
1704+ handle. await . unwrap ( )
1705+ } ) ;
1706+ assert ! ( matches!( handle. await , Ok ( 42 ) ) ) ;
1707+ } ) ;
1708+ }
1709+
1710+ fn test_spawn_colocated_nested < R : Runner > ( runner : R )
1711+ where
1712+ R :: Context : Spawner ,
1713+ {
1714+ runner. start ( |context| async move {
1715+ // Colocated grandchild inherits dedicated ancestor
1716+ let handle = context. dedicated ( ) . spawn ( |context| async move {
1717+ let handle = context. colocated ( ) . spawn ( |context| async move {
1718+ let handle = context. colocated ( ) . spawn ( |_| async move { 7 } ) ;
1719+ handle. await . unwrap ( )
1720+ } ) ;
1721+ handle. await . unwrap ( )
1722+ } ) ;
1723+ assert ! ( matches!( handle. await , Ok ( 7 ) ) ) ;
1724+ } ) ;
1725+ }
1726+
1727+ fn test_spawn_colocated_fallback < R : Runner > ( runner : R )
1728+ where
1729+ R :: Context : Spawner ,
1730+ {
1731+ runner. start ( |context| async move {
1732+ // Colocated without a dedicated ancestor falls back to shared
1733+ let handle = context. colocated ( ) . spawn ( |_| async move { 99 } ) ;
1734+ assert ! ( matches!( handle. await , Ok ( 99 ) ) ) ;
1735+ } ) ;
1736+ }
1737+
1738+ fn test_spawn_colocated_abort_on_parent_completion < R : Runner > ( runner : R )
1739+ where
1740+ R :: Context : Spawner ,
1741+ {
1742+ runner. start ( |context| async move {
1743+ let child_handle = Arc :: new ( Mutex :: new ( None ) ) ;
1744+ let child_handle2 = child_handle. clone ( ) ;
1745+
1746+ // Dedicated parent spawns a colocated child that hangs forever
1747+ let parent_handle = context. dedicated ( ) . spawn ( move |context| async move {
1748+ let handle = context. colocated ( ) . spawn ( |_| pending :: < ( ) > ( ) ) ;
1749+ * child_handle2. lock ( ) = Some ( handle) ;
1750+ } ) ;
1751+
1752+ // Parent completes immediately, colocated child should be aborted
1753+ assert ! ( parent_handle. await . is_ok( ) ) ;
1754+ let child_handle = child_handle. lock ( ) . take ( ) . unwrap ( ) ;
1755+ assert ! ( matches!( child_handle. await , Err ( Error :: Closed ) ) ) ;
1756+ } ) ;
1757+ }
1758+
16961759 fn test_spawn < R : Runner > ( runner : R )
16971760 where
16981761 R :: Context : Spawner + Clock ,
@@ -3324,6 +3387,30 @@ mod tests {
33243387 test_spawn_dedicated ( executor) ;
33253388 }
33263389
3390+ #[ test]
3391+ fn test_deterministic_spawn_colocated ( ) {
3392+ let executor = deterministic:: Runner :: default ( ) ;
3393+ test_spawn_colocated ( executor) ;
3394+ }
3395+
3396+ #[ test]
3397+ fn test_deterministic_spawn_colocated_nested ( ) {
3398+ let executor = deterministic:: Runner :: default ( ) ;
3399+ test_spawn_colocated_nested ( executor) ;
3400+ }
3401+
3402+ #[ test]
3403+ fn test_deterministic_spawn_colocated_fallback ( ) {
3404+ let executor = deterministic:: Runner :: default ( ) ;
3405+ test_spawn_colocated_fallback ( executor) ;
3406+ }
3407+
3408+ #[ test]
3409+ fn test_deterministic_spawn_colocated_abort_on_parent_completion ( ) {
3410+ let executor = deterministic:: Runner :: default ( ) ;
3411+ test_spawn_colocated_abort_on_parent_completion ( executor) ;
3412+ }
3413+
33273414 #[ test]
33283415 fn test_deterministic_spawn ( ) {
33293416 let runner = deterministic:: Runner :: default ( ) ;
@@ -3673,6 +3760,178 @@ mod tests {
36733760 test_spawn_dedicated ( executor) ;
36743761 }
36753762
3763+ #[ test]
3764+ fn test_tokio_spawn_colocated ( ) {
3765+ let executor = tokio:: Runner :: default ( ) ;
3766+ test_spawn_colocated ( executor) ;
3767+ }
3768+
3769+ #[ test]
3770+ fn test_tokio_spawn_colocated_nested ( ) {
3771+ let executor = tokio:: Runner :: default ( ) ;
3772+ test_spawn_colocated_nested ( executor) ;
3773+ }
3774+
3775+ #[ test]
3776+ fn test_tokio_spawn_colocated_fallback ( ) {
3777+ let executor = tokio:: Runner :: default ( ) ;
3778+ test_spawn_colocated_fallback ( executor) ;
3779+ }
3780+
3781+ #[ test]
3782+ fn test_tokio_spawn_colocated_abort_on_parent_completion ( ) {
3783+ let executor = tokio:: Runner :: default ( ) ;
3784+ test_spawn_colocated_abort_on_parent_completion ( executor) ;
3785+ }
3786+
3787+ #[ test]
3788+ fn test_tokio_spawn_colocated_same_thread ( ) {
3789+ // Verify that a colocated child runs on the same OS thread as its
3790+ // dedicated parent.
3791+ let executor = tokio:: Runner :: default ( ) ;
3792+ executor. start ( |context| async move {
3793+ let handle = context. dedicated ( ) . spawn ( |context| async move {
3794+ let parent_thread = std:: thread:: current ( ) . id ( ) ;
3795+ let child_thread = context
3796+ . colocated ( )
3797+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
3798+ . await
3799+ . unwrap ( ) ;
3800+ assert_eq ! ( parent_thread, child_thread) ;
3801+ } ) ;
3802+ handle. await . unwrap ( ) ;
3803+ } ) ;
3804+ }
3805+
3806+ #[ test]
3807+ fn test_tokio_spawn_colocated_nested_same_thread ( ) {
3808+ // Verify that colocation chains: dedicated -> colocated -> colocated
3809+ // all stay on the same OS thread.
3810+ let executor = tokio:: Runner :: default ( ) ;
3811+ executor. start ( |context| async move {
3812+ let handle = context. dedicated ( ) . spawn ( |context| async move {
3813+ let root_thread = std:: thread:: current ( ) . id ( ) ;
3814+
3815+ // Colocated child spawns another colocated grandchild
3816+ let grandchild_thread = context
3817+ . colocated ( )
3818+ . spawn ( |context| async move {
3819+ context
3820+ . colocated ( )
3821+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
3822+ . await
3823+ . unwrap ( )
3824+ } )
3825+ . await
3826+ . unwrap ( ) ;
3827+
3828+ // All three levels share the same thread
3829+ assert_eq ! ( root_thread, grandchild_thread) ;
3830+ } ) ;
3831+ handle. await . unwrap ( ) ;
3832+ } ) ;
3833+ }
3834+
3835+ #[ test]
3836+ fn test_tokio_spawn_colocated_new_dedicated_new_thread ( ) {
3837+ // Verify that a dedicated child from a colocated context starts a new
3838+ // thread with its own colocation chain.
3839+ //
3840+ // dedicated (thread A)
3841+ // +-- colocated (thread A)
3842+ // +-- dedicated (thread B, new chain)
3843+ // +-- colocated (thread B)
3844+ let executor = tokio:: Runner :: default ( ) ;
3845+ executor. start ( |context| async move {
3846+ let handle = context. dedicated ( ) . spawn ( |context| async move {
3847+ let thread_a = std:: thread:: current ( ) . id ( ) ;
3848+
3849+ let ( thread_b_dedicated, thread_b_colocated) = context
3850+ . colocated ( )
3851+ . spawn ( |context| async move {
3852+ // New dedicated child starts on a different thread
3853+ context
3854+ . dedicated ( )
3855+ . spawn ( |context| async move {
3856+ let thread_b = std:: thread:: current ( ) . id ( ) ;
3857+ // Colocated from the new dedicated thread
3858+ let colocated_thread = context
3859+ . colocated ( )
3860+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
3861+ . await
3862+ . unwrap ( ) ;
3863+ ( thread_b, colocated_thread)
3864+ } )
3865+ . await
3866+ . unwrap ( )
3867+ } )
3868+ . await
3869+ . unwrap ( ) ;
3870+
3871+ // New dedicated child is on a different thread
3872+ assert_ne ! ( thread_a, thread_b_dedicated) ;
3873+ // Its colocated child stays on that new thread
3874+ assert_eq ! ( thread_b_dedicated, thread_b_colocated) ;
3875+ } ) ;
3876+ handle. await . unwrap ( ) ;
3877+ } ) ;
3878+ }
3879+
3880+ #[ test]
3881+ fn test_tokio_spawn_colocated_fallback_different_thread ( ) {
3882+ // Verify that colocated without a dedicated ancestor falls back to the
3883+ // shared pool. The root task runs on the block_on thread (not a worker
3884+ // thread), so the spawned task is guaranteed to be on a different thread.
3885+ let executor = tokio:: Runner :: default ( ) ;
3886+ executor. start ( |context| async move {
3887+ let root_thread = std:: thread:: current ( ) . id ( ) ;
3888+ let child_thread = context
3889+ . colocated ( )
3890+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
3891+ . await
3892+ . unwrap ( ) ;
3893+ assert_ne ! ( root_thread, child_thread) ;
3894+ } ) ;
3895+ }
3896+
3897+ #[ test]
3898+ fn test_tokio_spawn_colocated_breaks_on_shared ( ) {
3899+ // Verify that a shared spawn breaks the colocation chain: a colocated
3900+ // grandchild spawned from a shared child must NOT land back on the
3901+ // dedicated thread.
3902+ //
3903+ // dedicated (thread A)
3904+ // +-- shared child (thread B, leaves dedicated)
3905+ // +-- colocated grandchild (no dedicated ancestor, stays on shared)
3906+ let executor = tokio:: Runner :: default ( ) ;
3907+ executor. start ( |context| async move {
3908+ let handle = context. dedicated ( ) . spawn ( |context| async move {
3909+ let dedicated_thread = std:: thread:: current ( ) . id ( ) ;
3910+
3911+ // Shared child leaves the dedicated thread
3912+ let ( shared_thread, grandchild_thread) = context
3913+ . clone ( )
3914+ . spawn ( |context| async move {
3915+ // Colocated from here has no dedicated ancestor
3916+ let grandchild_thread = context
3917+ . colocated ( )
3918+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
3919+ . await
3920+ . unwrap ( ) ;
3921+ ( std:: thread:: current ( ) . id ( ) , grandchild_thread)
3922+ } )
3923+ . await
3924+ . unwrap ( ) ;
3925+
3926+ // Shared child is not on the dedicated thread
3927+ assert_ne ! ( dedicated_thread, shared_thread) ;
3928+ // Colocated grandchild did not return to the dedicated thread
3929+ assert_ne ! ( dedicated_thread, grandchild_thread) ;
3930+ } ) ;
3931+ handle. await . unwrap ( ) ;
3932+ } ) ;
3933+ }
3934+
36763935 #[ test]
36773936 fn test_tokio_spawn ( ) {
36783937 let runner = tokio:: Runner :: default ( ) ;
0 commit comments