@@ -1736,6 +1736,69 @@ mod tests {
17361736 } ) ;
17371737 }
17381738
1739+ fn test_spawn_colocated < R : Runner > ( runner : R )
1740+ where
1741+ R :: Context : Spawner ,
1742+ {
1743+ runner. start ( |context| async move {
1744+ // Colocated child from a dedicated parent
1745+ let handle = context. dedicated ( ) . spawn ( |context| async move {
1746+ let handle = context. colocated ( ) . spawn ( |_| async move { 42 } ) ;
1747+ handle. await . unwrap ( )
1748+ } ) ;
1749+ assert ! ( matches!( handle. await , Ok ( 42 ) ) ) ;
1750+ } ) ;
1751+ }
1752+
1753+ fn test_spawn_colocated_nested < R : Runner > ( runner : R )
1754+ where
1755+ R :: Context : Spawner ,
1756+ {
1757+ runner. start ( |context| async move {
1758+ // Colocated grandchild inherits dedicated ancestor
1759+ let handle = context. dedicated ( ) . spawn ( |context| async move {
1760+ let handle = context. colocated ( ) . spawn ( |context| async move {
1761+ let handle = context. colocated ( ) . spawn ( |_| async move { 7 } ) ;
1762+ handle. await . unwrap ( )
1763+ } ) ;
1764+ handle. await . unwrap ( )
1765+ } ) ;
1766+ assert ! ( matches!( handle. await , Ok ( 7 ) ) ) ;
1767+ } ) ;
1768+ }
1769+
1770+ fn test_spawn_colocated_fallback < R : Runner > ( runner : R )
1771+ where
1772+ R :: Context : Spawner ,
1773+ {
1774+ runner. start ( |context| async move {
1775+ // Colocated without a dedicated ancestor falls back to shared
1776+ let handle = context. colocated ( ) . spawn ( |_| async move { 99 } ) ;
1777+ assert ! ( matches!( handle. await , Ok ( 99 ) ) ) ;
1778+ } ) ;
1779+ }
1780+
1781+ fn test_spawn_colocated_abort_on_parent_completion < R : Runner > ( runner : R )
1782+ where
1783+ R :: Context : Spawner ,
1784+ {
1785+ runner. start ( |context| async move {
1786+ let child_handle = Arc :: new ( Mutex :: new ( None ) ) ;
1787+ let child_handle2 = child_handle. clone ( ) ;
1788+
1789+ // Dedicated parent spawns a colocated child that hangs forever
1790+ let parent_handle = context. dedicated ( ) . spawn ( move |context| async move {
1791+ let handle = context. colocated ( ) . spawn ( |_| pending :: < ( ) > ( ) ) ;
1792+ * child_handle2. lock ( ) = Some ( handle) ;
1793+ } ) ;
1794+
1795+ // Parent completes immediately, colocated child should be aborted
1796+ assert ! ( parent_handle. await . is_ok( ) ) ;
1797+ let child_handle = child_handle. lock ( ) . take ( ) . unwrap ( ) ;
1798+ assert ! ( matches!( child_handle. await , Err ( Error :: Closed ) ) ) ;
1799+ } ) ;
1800+ }
1801+
17391802 fn test_spawn < R : Runner > ( runner : R )
17401803 where
17411804 R :: Context : Spawner + Clock ,
@@ -3551,6 +3614,30 @@ mod tests {
35513614 test_spawn_dedicated ( executor) ;
35523615 }
35533616
3617+ #[ test]
3618+ fn test_deterministic_spawn_colocated ( ) {
3619+ let executor = deterministic:: Runner :: default ( ) ;
3620+ test_spawn_colocated ( executor) ;
3621+ }
3622+
3623+ #[ test]
3624+ fn test_deterministic_spawn_colocated_nested ( ) {
3625+ let executor = deterministic:: Runner :: default ( ) ;
3626+ test_spawn_colocated_nested ( executor) ;
3627+ }
3628+
3629+ #[ test]
3630+ fn test_deterministic_spawn_colocated_fallback ( ) {
3631+ let executor = deterministic:: Runner :: default ( ) ;
3632+ test_spawn_colocated_fallback ( executor) ;
3633+ }
3634+
3635+ #[ test]
3636+ fn test_deterministic_spawn_colocated_abort_on_parent_completion ( ) {
3637+ let executor = deterministic:: Runner :: default ( ) ;
3638+ test_spawn_colocated_abort_on_parent_completion ( executor) ;
3639+ }
3640+
35543641 #[ test]
35553642 fn test_deterministic_spawn ( ) {
35563643 let runner = deterministic:: Runner :: default ( ) ;
@@ -3900,6 +3987,178 @@ mod tests {
39003987 test_spawn_dedicated ( executor) ;
39013988 }
39023989
3990+ #[ test]
3991+ fn test_tokio_spawn_colocated ( ) {
3992+ let executor = tokio:: Runner :: default ( ) ;
3993+ test_spawn_colocated ( executor) ;
3994+ }
3995+
3996+ #[ test]
3997+ fn test_tokio_spawn_colocated_nested ( ) {
3998+ let executor = tokio:: Runner :: default ( ) ;
3999+ test_spawn_colocated_nested ( executor) ;
4000+ }
4001+
4002+ #[ test]
4003+ fn test_tokio_spawn_colocated_fallback ( ) {
4004+ let executor = tokio:: Runner :: default ( ) ;
4005+ test_spawn_colocated_fallback ( executor) ;
4006+ }
4007+
4008+ #[ test]
4009+ fn test_tokio_spawn_colocated_abort_on_parent_completion ( ) {
4010+ let executor = tokio:: Runner :: default ( ) ;
4011+ test_spawn_colocated_abort_on_parent_completion ( executor) ;
4012+ }
4013+
4014+ #[ test]
4015+ fn test_tokio_spawn_colocated_same_thread ( ) {
4016+ // Verify that a colocated child runs on the same OS thread as its
4017+ // dedicated parent.
4018+ let executor = tokio:: Runner :: default ( ) ;
4019+ executor. start ( |context| async move {
4020+ let handle = context. dedicated ( ) . spawn ( |context| async move {
4021+ let parent_thread = std:: thread:: current ( ) . id ( ) ;
4022+ let child_thread = context
4023+ . colocated ( )
4024+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
4025+ . await
4026+ . unwrap ( ) ;
4027+ assert_eq ! ( parent_thread, child_thread) ;
4028+ } ) ;
4029+ handle. await . unwrap ( ) ;
4030+ } ) ;
4031+ }
4032+
4033+ #[ test]
4034+ fn test_tokio_spawn_colocated_nested_same_thread ( ) {
4035+ // Verify that colocation chains: dedicated -> colocated -> colocated
4036+ // all stay on the same OS thread.
4037+ let executor = tokio:: Runner :: default ( ) ;
4038+ executor. start ( |context| async move {
4039+ let handle = context. dedicated ( ) . spawn ( |context| async move {
4040+ let root_thread = std:: thread:: current ( ) . id ( ) ;
4041+
4042+ // Colocated child spawns another colocated grandchild
4043+ let grandchild_thread = context
4044+ . colocated ( )
4045+ . spawn ( |context| async move {
4046+ context
4047+ . colocated ( )
4048+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
4049+ . await
4050+ . unwrap ( )
4051+ } )
4052+ . await
4053+ . unwrap ( ) ;
4054+
4055+ // All three levels share the same thread
4056+ assert_eq ! ( root_thread, grandchild_thread) ;
4057+ } ) ;
4058+ handle. await . unwrap ( ) ;
4059+ } ) ;
4060+ }
4061+
4062+ #[ test]
4063+ fn test_tokio_spawn_colocated_new_dedicated_new_thread ( ) {
4064+ // Verify that a dedicated child from a colocated context starts a new
4065+ // thread with its own colocation chain.
4066+ //
4067+ // dedicated (thread A)
4068+ // +-- colocated (thread A)
4069+ // +-- dedicated (thread B, new chain)
4070+ // +-- colocated (thread B)
4071+ let executor = tokio:: Runner :: default ( ) ;
4072+ executor. start ( |context| async move {
4073+ let handle = context. dedicated ( ) . spawn ( |context| async move {
4074+ let thread_a = std:: thread:: current ( ) . id ( ) ;
4075+
4076+ let ( thread_b_dedicated, thread_b_colocated) = context
4077+ . colocated ( )
4078+ . spawn ( |context| async move {
4079+ // New dedicated child starts on a different thread
4080+ context
4081+ . dedicated ( )
4082+ . spawn ( |context| async move {
4083+ let thread_b = std:: thread:: current ( ) . id ( ) ;
4084+ // Colocated from the new dedicated thread
4085+ let colocated_thread = context
4086+ . colocated ( )
4087+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
4088+ . await
4089+ . unwrap ( ) ;
4090+ ( thread_b, colocated_thread)
4091+ } )
4092+ . await
4093+ . unwrap ( )
4094+ } )
4095+ . await
4096+ . unwrap ( ) ;
4097+
4098+ // New dedicated child is on a different thread
4099+ assert_ne ! ( thread_a, thread_b_dedicated) ;
4100+ // Its colocated child stays on that new thread
4101+ assert_eq ! ( thread_b_dedicated, thread_b_colocated) ;
4102+ } ) ;
4103+ handle. await . unwrap ( ) ;
4104+ } ) ;
4105+ }
4106+
4107+ #[ test]
4108+ fn test_tokio_spawn_colocated_fallback_different_thread ( ) {
4109+ // Verify that colocated without a dedicated ancestor falls back to the
4110+ // shared pool. The root task runs on the block_on thread (not a worker
4111+ // thread), so the spawned task is guaranteed to be on a different thread.
4112+ let executor = tokio:: Runner :: default ( ) ;
4113+ executor. start ( |context| async move {
4114+ let root_thread = std:: thread:: current ( ) . id ( ) ;
4115+ let child_thread = context
4116+ . colocated ( )
4117+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
4118+ . await
4119+ . unwrap ( ) ;
4120+ assert_ne ! ( root_thread, child_thread) ;
4121+ } ) ;
4122+ }
4123+
4124+ #[ test]
4125+ fn test_tokio_spawn_colocated_breaks_on_shared ( ) {
4126+ // Verify that a shared spawn breaks the colocation chain: a colocated
4127+ // grandchild spawned from a shared child must NOT land back on the
4128+ // dedicated thread.
4129+ //
4130+ // dedicated (thread A)
4131+ // +-- shared child (thread B, leaves dedicated)
4132+ // +-- colocated grandchild (no dedicated ancestor, stays on shared)
4133+ let executor = tokio:: Runner :: default ( ) ;
4134+ executor. start ( |context| async move {
4135+ let handle = context. dedicated ( ) . spawn ( |context| async move {
4136+ let dedicated_thread = std:: thread:: current ( ) . id ( ) ;
4137+
4138+ // Shared child leaves the dedicated thread
4139+ let ( shared_thread, grandchild_thread) = context
4140+ . clone ( )
4141+ . spawn ( |context| async move {
4142+ // Colocated from here has no dedicated ancestor
4143+ let grandchild_thread = context
4144+ . colocated ( )
4145+ . spawn ( |_| async move { std:: thread:: current ( ) . id ( ) } )
4146+ . await
4147+ . unwrap ( ) ;
4148+ ( std:: thread:: current ( ) . id ( ) , grandchild_thread)
4149+ } )
4150+ . await
4151+ . unwrap ( ) ;
4152+
4153+ // Shared child is not on the dedicated thread
4154+ assert_ne ! ( dedicated_thread, shared_thread) ;
4155+ // Colocated grandchild did not return to the dedicated thread
4156+ assert_ne ! ( dedicated_thread, grandchild_thread) ;
4157+ } ) ;
4158+ handle. await . unwrap ( ) ;
4159+ } ) ;
4160+ }
4161+
39034162 #[ test]
39044163 fn test_tokio_spawn ( ) {
39054164 let runner = tokio:: Runner :: default ( ) ;
0 commit comments