@@ -1330,11 +1330,15 @@ async fn graceful_shutdown_sends_shutdown_worker_rpc_during_initiate() {
13301330 worker. finalize_shutdown ( ) . await ;
13311331}
13321332
1333+ /// Verifies that even if the a nexus task completion is dropped, the nexus worker
1334+ /// is able to trigger PollShutdown.
13331335#[ tokio:: test]
13341336async fn nexus_shutdown_does_not_hang_when_pending_completion_is_cancelled ( ) {
13351337 let mut client = mock_manual_worker_client ( ) ;
13361338 let completion_rpc_started = Arc :: new ( Barrier :: new ( 2 ) ) ;
13371339 let completion_rpc_started_clone = completion_rpc_started. clone ( ) ;
1340+
1341+ // Create a client that will hang on complete_nexus_task
13381342 client
13391343 . expect_complete_nexus_task ( )
13401344 . times ( 1 )
@@ -1370,16 +1374,22 @@ async fn nexus_shutdown_does_not_hang_when_pending_completion_is_cancelled() {
13701374 } ) ;
13711375 let worker = mock_worker ( mocks) ;
13721376
1377+ // Poll the first task from the stream and initiate shutdown
13731378 let nexus_task = worker. poll_nexus_task ( ) . await . unwrap ( ) ;
13741379 worker. initiate_shutdown ( ) ;
13751380
1376- let mut shutdown_poll = Box :: pin ( worker. poll_nexus_task ( ) ) ;
1377- tokio:: time:: timeout ( Duration :: from_millis ( 50 ) , shutdown_poll. as_mut ( ) )
1381+ // Poll the stream again and expect it to wait for the task to complete
1382+ let mut poll_future = Box :: pin ( worker. poll_nexus_task ( ) ) ;
1383+ tokio:: time:: timeout ( Duration :: from_millis ( 50 ) , poll_future. as_mut ( ) )
13781384 . await
1379- . expect_err ( "shutdown poll should wait for the outstanding Nexus task" ) ;
1385+ . expect_err ( "poll should wait for the outstanding Nexus task" ) ;
13801386
1387+ // Send completion that we know will hang indefinitely before notifying waitiers
13811388 let mut completion =
13821389 Box :: pin ( worker. complete_nexus_task ( create_test_nexus_completion ( nexus_task. task_token ( ) ) ) ) ;
1390+
1391+ // Wait for the completion to start then drop it before notify_waitiers is triggered
1392+ // Use select so the completion future is polled and completion actually starts
13831393 tokio:: select! {
13841394 _ = completion_rpc_started. wait( ) => { }
13851395 result = completion. as_mut( ) => {
@@ -1388,11 +1398,13 @@ async fn nexus_shutdown_does_not_hang_when_pending_completion_is_cancelled() {
13881398 }
13891399 drop ( completion) ;
13901400
1391- let poll_result = tokio:: time:: timeout ( Duration :: from_secs ( 1 ) , shutdown_poll. as_mut ( ) )
1401+ // Polling again should now return PollError::ShutDown because the outstanding task map is empty
1402+ // and waiters should have been notified.
1403+ let poll_result = tokio:: time:: timeout ( Duration :: from_secs ( 1 ) , poll_future. as_mut ( ) )
13921404 . await
13931405 . expect ( "shutdown poll should finish when a pending completion future is cancelled" ) ;
1394- assert_matches ! ( poll_result. unwrap_err ( ) , PollError :: ShutDown ) ;
1395- drop ( shutdown_poll ) ;
1406+ assert_matches ! ( poll_result, Err ( PollError :: ShutDown ) ) ;
1407+ drop ( poll_future ) ;
13961408
13971409 worker. shutdown ( ) . await ;
13981410 worker. finalize_shutdown ( ) . await ;
0 commit comments