@@ -1105,8 +1105,9 @@ async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
11051105 assert_matches ! (
11061106 cancel. variant,
11071107 Some ( activity_task:: Variant :: Cancel ( Cancel {
1108- reason: r
1109- } ) ) if r == ActivityCancelReason :: WorkerShutdown as i32
1108+ reason,
1109+ details
1110+ } ) ) if reason == ActivityCancelReason :: WorkerShutdown as i32 && details. as_ref( ) . is_some_and( |d| d. is_worker_shutdown)
11101111 ) ;
11111112 seen_tts. insert ( cancel. task_token ) ;
11121113 }
@@ -1241,3 +1242,163 @@ async fn pass_activity_summary_to_metadata() {
12411242 . unwrap ( ) ;
12421243 worker. run_until_done ( ) . await . unwrap ( ) ;
12431244}
1245+
1246+ #[ tokio:: test]
1247+ async fn heartbeat_response_can_be_paused ( ) {
1248+ let mut mock_client = mock_workflow_client ( ) ;
1249+ // First heartbeat returns pause only
1250+ mock_client
1251+ . expect_record_activity_heartbeat ( )
1252+ . times ( 1 )
1253+ . returning ( |_, _| {
1254+ Ok ( RecordActivityTaskHeartbeatResponse {
1255+ cancel_requested : false ,
1256+ activity_paused : true ,
1257+ } )
1258+ } ) ;
1259+ // Second heartbeat returns cancel only
1260+ mock_client
1261+ . expect_record_activity_heartbeat ( )
1262+ . times ( 1 )
1263+ . returning ( |_, _| {
1264+ Ok ( RecordActivityTaskHeartbeatResponse {
1265+ cancel_requested : true ,
1266+ activity_paused : false ,
1267+ } )
1268+ } ) ;
1269+ // Third heartbeat returns both
1270+ mock_client
1271+ . expect_record_activity_heartbeat ( )
1272+ . times ( 1 )
1273+ . returning ( |_, _| {
1274+ Ok ( RecordActivityTaskHeartbeatResponse {
1275+ cancel_requested : true ,
1276+ activity_paused : true ,
1277+ } )
1278+ } ) ;
1279+ mock_client
1280+ . expect_cancel_activity_task ( )
1281+ . times ( 3 )
1282+ . returning ( |_, _| Ok ( RespondActivityTaskCanceledResponse :: default ( ) ) ) ;
1283+
1284+ let core = mock_worker ( MocksHolder :: from_client_with_activities (
1285+ mock_client,
1286+ [
1287+ PollActivityTaskQueueResponse {
1288+ task_token : vec ! [ 1 ] ,
1289+ activity_id : "act1" . to_string ( ) ,
1290+ heartbeat_timeout : Some ( prost_dur ! ( from_millis( 1 ) ) ) ,
1291+ ..Default :: default ( )
1292+ }
1293+ . into ( ) ,
1294+ PollActivityTaskQueueResponse {
1295+ task_token : vec ! [ 2 ] ,
1296+ activity_id : "act2" . to_string ( ) ,
1297+ heartbeat_timeout : Some ( prost_dur ! ( from_millis( 1 ) ) ) ,
1298+ ..Default :: default ( )
1299+ }
1300+ . into ( ) ,
1301+ PollActivityTaskQueueResponse {
1302+ task_token : vec ! [ 3 ] ,
1303+ activity_id : "act3" . to_string ( ) ,
1304+ heartbeat_timeout : Some ( prost_dur ! ( from_millis( 1 ) ) ) ,
1305+ ..Default :: default ( )
1306+ }
1307+ . into ( ) ,
1308+ ] ,
1309+ ) ) ;
1310+
1311+ // The general testing pattern for each of these cases is:
1312+ // 1. Poll for activity task
1313+ // 2. Record activity heartbeat, get mocked heartbeat response
1314+ // 3. Sleep for 10ms (waiting for heartbeat request to be flushed)
1315+ // (i.e. sleep enough for the heartbeat flush interval to have elapsed)
1316+ // 4. Poll for activity task.
1317+ // We expect a cancellation activity task as they are prioritized (i.e. ordered before)
1318+ // regular activity tasks.
1319+ // 5. Assert that the received activity task is indeed a cancellation, with the reason
1320+ // and details we expect.
1321+ // 6. Complete the activity with a cancellation result.
1322+ //
1323+ // Repeat for subsequent test case(s).
1324+
1325+ // Test pause only
1326+ let act = core. poll_activity_task ( ) . await . unwrap ( ) ;
1327+ core. record_activity_heartbeat ( ActivityHeartbeat {
1328+ task_token : act. task_token . clone ( ) ,
1329+ details : vec ! [ vec![ 1_u8 , 2 , 3 ] . into( ) ] ,
1330+ } ) ;
1331+ sleep ( Duration :: from_millis ( 10 ) ) . await ;
1332+ let act = core. poll_activity_task ( ) . await . unwrap ( ) ;
1333+ assert_matches ! (
1334+ & act,
1335+ ActivityTask {
1336+ task_token,
1337+ variant: Some ( activity_task:: Variant :: Cancel ( Cancel { reason, details } ) ) ,
1338+ } if
1339+ task_token == & vec![ 1 ] &&
1340+ * reason == ActivityCancelReason :: Paused as i32 &&
1341+ details. as_ref( ) . is_some_and( |d| d. is_paused) &&
1342+ details. as_ref( ) . is_some_and( |d| !d. is_cancelled)
1343+ ) ;
1344+ core. complete_activity_task ( ActivityTaskCompletion {
1345+ task_token : act. task_token ,
1346+ result : Some ( ActivityExecutionResult :: cancel_from_details ( None ) ) ,
1347+ } )
1348+ . await
1349+ . unwrap ( ) ;
1350+
1351+ // Test cancel only
1352+ let act = core. poll_activity_task ( ) . await . unwrap ( ) ;
1353+ core. record_activity_heartbeat ( ActivityHeartbeat {
1354+ task_token : act. task_token . clone ( ) ,
1355+ details : vec ! [ vec![ 1_u8 , 2 , 3 ] . into( ) ] ,
1356+ } ) ;
1357+ sleep ( Duration :: from_millis ( 10 ) ) . await ;
1358+ let act = core. poll_activity_task ( ) . await . unwrap ( ) ;
1359+ assert_matches ! (
1360+ & act,
1361+ ActivityTask {
1362+ task_token,
1363+ variant: Some ( activity_task:: Variant :: Cancel ( Cancel { reason, details } ) ) ,
1364+ } if
1365+ task_token == & vec![ 2 ] &&
1366+ * reason == ActivityCancelReason :: Cancelled as i32 &&
1367+ details. as_ref( ) . is_some_and( |d| !d. is_paused) &&
1368+ details. as_ref( ) . is_some_and( |d| d. is_cancelled)
1369+ ) ;
1370+ core. complete_activity_task ( ActivityTaskCompletion {
1371+ task_token : act. task_token ,
1372+ result : Some ( ActivityExecutionResult :: cancel_from_details ( None ) ) ,
1373+ } )
1374+ . await
1375+ . unwrap ( ) ;
1376+
1377+ // Test both pause and cancel (should prioritize cancel)
1378+ let act = core. poll_activity_task ( ) . await . unwrap ( ) ;
1379+ core. record_activity_heartbeat ( ActivityHeartbeat {
1380+ task_token : act. task_token . clone ( ) ,
1381+ details : vec ! [ vec![ 1_u8 , 2 , 3 ] . into( ) ] ,
1382+ } ) ;
1383+ sleep ( Duration :: from_millis ( 10 ) ) . await ;
1384+ let act = core. poll_activity_task ( ) . await . unwrap ( ) ;
1385+ assert_matches ! (
1386+ & act,
1387+ ActivityTask {
1388+ task_token,
1389+ variant: Some ( activity_task:: Variant :: Cancel ( Cancel { reason, details } ) ) ,
1390+ } if
1391+ task_token == & vec![ 3 ] &&
1392+ * reason == ActivityCancelReason :: Cancelled as i32 &&
1393+ details. as_ref( ) . is_some_and( |d| d. is_paused) &&
1394+ details. as_ref( ) . is_some_and( |d| d. is_cancelled)
1395+ ) ;
1396+ core. complete_activity_task ( ActivityTaskCompletion {
1397+ task_token : act. task_token ,
1398+ result : Some ( ActivityExecutionResult :: cancel_from_details ( None ) ) ,
1399+ } )
1400+ . await
1401+ . unwrap ( ) ;
1402+
1403+ core. drain_activity_poller_and_shutdown ( ) . await ;
1404+ }
0 commit comments