Skip to content

Commit fbd1bc1

Browse files
committed
expose multiple cancellation reasons as details
1 parent 2a017f0 commit fbd1bc1

6 files changed

Lines changed: 102 additions & 45 deletions

File tree

core/src/core_tests/activity_tasks.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,8 +1105,9 @@ async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
11051105
assert_matches!(
11061106
cancel.variant,
11071107
Some(activity_task::Variant::Cancel(Cancel {
1108-
reason: r
1109-
})) if r == ActivityCancelReason::WorkerShutdown as i32
1108+
reason,
1109+
details
1110+
})) if reason == ActivityCancelReason::WorkerShutdown as i32 && details.as_ref().is_some_and(|d| d.is_worker_shutdown)
11101111
);
11111112
seen_tts.insert(cancel.task_token);
11121113
}
@@ -1316,7 +1317,7 @@ async fn heartbeat_response_can_be_paused() {
13161317
// We expect a cancellation activity task as they are prioritized (i.e. ordered before)
13171318
// regular activity tasks.
13181319
// 5. Assert that the received activity task is indeed a cancellation, with the reason
1319-
// we expect.
1320+
// and details we expect.
13201321
// 6. Complete the activity with a cancellation result.
13211322
//
13221323
// Repeat for subsequent test case(s).
@@ -1333,12 +1334,12 @@ async fn heartbeat_response_can_be_paused() {
13331334
&act,
13341335
ActivityTask {
13351336
task_token,
1336-
variant: Some(activity_task::Variant::Cancel(Cancel { reason })),
1337-
..
1338-
} => {
1337+
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
1338+
} if
13391339
task_token == &vec![1] &&
1340-
*reason == ActivityCancelReason::Paused as i32
1341-
}
1340+
*reason == ActivityCancelReason::Paused as i32 &&
1341+
details.as_ref().is_some_and(|d| d.is_paused) &&
1342+
details.as_ref().is_some_and(|d| d.is_cancelled == false)
13421343
);
13431344
core.complete_activity_task(ActivityTaskCompletion {
13441345
task_token: act.task_token,
@@ -1359,12 +1360,12 @@ async fn heartbeat_response_can_be_paused() {
13591360
&act,
13601361
ActivityTask {
13611362
task_token,
1362-
variant: Some(activity_task::Variant::Cancel(Cancel { reason })),
1363-
..
1364-
} => {
1363+
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
1364+
} if
13651365
task_token == &vec![2] &&
1366-
*reason == ActivityCancelReason::Cancelled as i32
1367-
}
1366+
*reason == ActivityCancelReason::Cancelled as i32 &&
1367+
details.as_ref().is_some_and(|d| d.is_paused == false) &&
1368+
details.as_ref().is_some_and(|d| d.is_cancelled)
13681369
);
13691370
core.complete_activity_task(ActivityTaskCompletion {
13701371
task_token: act.task_token,
@@ -1385,12 +1386,12 @@ async fn heartbeat_response_can_be_paused() {
13851386
&act,
13861387
ActivityTask {
13871388
task_token,
1388-
variant: Some(activity_task::Variant::Cancel(Cancel { reason })),
1389-
..
1390-
} => {
1389+
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
1390+
} if
13911391
task_token == &vec![3] &&
1392-
*reason == ActivityCancelReason::Cancelled as i32
1393-
}
1392+
*reason == ActivityCancelReason::Cancelled as i32 &&
1393+
details.as_ref().is_some_and(|d| d.is_paused) &&
1394+
details.as_ref().is_some_and(|d| d.is_cancelled)
13941395
);
13951396
core.complete_activity_task(ActivityTaskCompletion {
13961397
task_token: act.task_token,

core/src/worker/activities.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use temporal_sdk_core_protos::{
4040
coresdk::{
4141
ActivityHeartbeat, ActivitySlotInfo,
4242
activity_result::{self as ar, activity_execution_result as aer},
43-
activity_task::{ActivityCancelReason, ActivityTask},
43+
activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask},
4444
},
4545
temporal::api::{
4646
failure::v1::{ApplicationFailureInfo, CanceledFailureInfo, Failure, failure::FailureInfo},
@@ -65,16 +65,19 @@ type OutstandingActMap = Arc<DashMap<TaskToken, RemoteInFlightActInfo>>;
6565
struct PendingActivityCancel {
6666
task_token: TaskToken,
6767
reason: ActivityCancelReason,
68-
/// Set true if we should assume the server has already forgotten about this activity
69-
consider_not_found: bool,
68+
details: ActivityCancellationDetails,
7069
}
7170

7271
impl PendingActivityCancel {
73-
fn new(task_token: TaskToken, reason: ActivityCancelReason) -> Self {
72+
fn new(
73+
task_token: TaskToken,
74+
reason: ActivityCancelReason,
75+
details: ActivityCancellationDetails,
76+
) -> Self {
7477
Self {
7578
task_token,
7679
reason,
77-
consider_not_found: false,
80+
details,
7881
}
7982
}
8083
}
@@ -508,13 +511,14 @@ where
508511
} else {
509512
details.issued_cancel_to_lang = Some(next_pc.reason);
510513
if next_pc.reason == ActivityCancelReason::NotFound
511-
|| next_pc.consider_not_found
514+
|| next_pc.details.is_not_found
512515
{
513516
details.known_not_found = true;
514517
}
515518
Some(Ok(ActivityTask::cancel_from_ids(
516519
next_pc.task_token.0,
517520
next_pc.reason,
521+
next_pc.details,
518522
)))
519523
}
520524
} else {
@@ -566,6 +570,9 @@ where
566570
let _ = cancels_tx.send(PendingActivityCancel::new(
567571
tt,
568572
ActivityCancelReason::WorkerShutdown,
573+
ActivityTask::primary_reason_to_cancellation_details(
574+
ActivityCancelReason::WorkerShutdown,
575+
),
569576
));
570577
} else {
571578
// Fire off task to keep track of local timeouts. We do this so that
@@ -611,11 +618,15 @@ where
611618
"Timing out activity due to elapsed local \
612619
{timeout_type} timer"
613620
);
614-
let _ = cancel_tx.send(PendingActivityCancel {
615-
task_token: tt,
616-
reason: ActivityCancelReason::TimedOut,
617-
consider_not_found: true,
618-
});
621+
let _ = cancel_tx.send(PendingActivityCancel::new(
622+
tt,
623+
ActivityCancelReason::TimedOut,
624+
ActivityCancellationDetails {
625+
is_not_found: true,
626+
is_timed_out: true,
627+
..Default::default()
628+
},
629+
));
619630
}));
620631
outstanding_info.timeout_resetter = resetter;
621632
}
@@ -639,6 +650,9 @@ where
639650
let _ = self.cancels_tx.send(PendingActivityCancel::new(
640651
mapref.key().clone(),
641652
ActivityCancelReason::WorkerShutdown,
653+
ActivityTask::primary_reason_to_cancellation_details(
654+
ActivityCancelReason::WorkerShutdown,
655+
),
642656
));
643657
}
644658
}

core/src/worker/activities/activity_heartbeat_manager.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use std::{
1010
time::{Duration, Instant},
1111
};
1212
use temporal_sdk_core_protos::{
13-
coresdk::{ActivityHeartbeat, IntoPayloadsExt, activity_task::ActivityCancelReason},
13+
coresdk::{
14+
ActivityHeartbeat, IntoPayloadsExt,
15+
activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask},
16+
},
1417
temporal::api::{
1518
common::v1::Payload, workflowservice::v1::RecordActivityTaskHeartbeatResponse,
1619
},
@@ -144,11 +147,17 @@ impl ActivityHeartbeatManager {
144147
{
145148
Ok(RecordActivityTaskHeartbeatResponse { cancel_requested, activity_paused }) => {
146149
if cancel_requested || activity_paused {
150+
// Prioritize Cancel over Pause
147151
let reason = if cancel_requested { ActivityCancelReason::Cancelled } else { ActivityCancelReason::Paused};
148152
cancels_tx
149153
.send(PendingActivityCancel::new(
150154
tt.clone(),
151155
reason,
156+
ActivityCancellationDetails {
157+
is_cancelled: cancel_requested,
158+
is_paused: activity_paused,
159+
..Default::default()
160+
}
152161
))
153162
.expect(
154163
"Receive half of heartbeat cancels not blocked",
@@ -165,6 +174,7 @@ impl ActivityHeartbeatManager {
165174
.send(PendingActivityCancel::new(
166175
tt.clone(),
167176
ActivityCancelReason::NotFound,
177+
ActivityTask::primary_reason_to_cancellation_details(ActivityCancelReason::NotFound)
168178
))
169179
.expect("Receive half of heartbeat cancels not blocked");
170180
}

core/src/worker/activities/local_activities.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use temporal_sdk_core_protos::{
2222
coresdk::{
2323
LocalActivitySlotInfo,
2424
activity_result::{Cancellation, Failure as ActFail, Success},
25-
activity_task::{ActivityCancelReason, ActivityTask, Cancel, Start, activity_task},
25+
activity_task::{ActivityCancelReason, ActivityTask, Start, activity_task},
2626
},
2727
temporal::api::{
2828
common::v1::WorkflowExecution,
@@ -629,12 +629,13 @@ impl LocalActivityManager {
629629
};
630630
// We want to generate a cancel task if the reason for failure was a timeout.
631631
let task = if is_timeout {
632-
Some(ActivityTask {
633-
task_token: task_token.clone().0,
634-
variant: Some(activity_task::Variant::Cancel(Cancel {
635-
reason: ActivityCancelReason::TimedOut as i32,
636-
})),
637-
})
632+
Some(ActivityTask::cancel_from_ids(
633+
task_token.clone().0,
634+
ActivityCancelReason::TimedOut,
635+
ActivityTask::primary_reason_to_cancellation_details(
636+
ActivityCancelReason::TimedOut,
637+
),
638+
))
638639
} else {
639640
None
640641
};
@@ -786,12 +787,13 @@ impl LocalActivityManager {
786787
}
787788

788789
self.cancels_req_tx
789-
.send(CancelOrTimeout::Cancel(ActivityTask {
790-
task_token: lai.task_token.0.clone(),
791-
variant: Some(activity_task::Variant::Cancel(Cancel {
792-
reason: ActivityCancelReason::Cancelled as i32,
793-
})),
794-
}))
790+
.send(CancelOrTimeout::Cancel(ActivityTask::cancel_from_ids(
791+
lai.task_token.0.clone(),
792+
ActivityCancelReason::Cancelled,
793+
ActivityTask::primary_reason_to_cancellation_details(
794+
ActivityCancelReason::Cancelled,
795+
),
796+
)))
795797
.expect("Receive half of LA cancel channel cannot be dropped");
796798
None
797799
}

sdk-core-protos/protos/local/temporal/sdk/core/activity_task/activity_task.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,18 @@ message Start {
6767

6868
// Attempt to cancel a running activity
6969
message Cancel {
70+
// Primary cancellation reason
7071
ActivityCancelReason reason = 1;
72+
// Activity cancellation details, surfaces all cancellation reasons.
73+
ActivityCancellationDetails details = 2;
74+
}
75+
76+
message ActivityCancellationDetails {
77+
bool is_not_found = 1;
78+
bool is_cancelled = 2;
79+
bool is_paused = 3;
80+
bool is_timed_out = 4;
81+
bool is_worker_shutdown = 5;
7182
}
7283

7384
enum ActivityCancelReason {

sdk-core-protos/src/lib.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,23 +69,42 @@ pub mod coresdk {
6969
tonic::include_proto!("coresdk.activity_task");
7070

7171
impl ActivityTask {
72-
pub fn cancel_from_ids(task_token: Vec<u8>, reason: ActivityCancelReason) -> Self {
72+
pub fn cancel_from_ids(
73+
task_token: Vec<u8>,
74+
reason: ActivityCancelReason,
75+
details: ActivityCancellationDetails,
76+
) -> Self {
7377
Self {
7478
task_token,
7579
variant: Some(activity_task::Variant::Cancel(Cancel {
7680
reason: reason as i32,
81+
details: Some(details),
7782
})),
7883
}
7984
}
8085

86+
// Checks if both the primary reason or details have a timeout cancellation.
8187
pub fn is_timeout(&self) -> bool {
8288
match &self.variant {
83-
Some(activity_task::Variant::Cancel(Cancel { reason })) => {
89+
Some(activity_task::Variant::Cancel(Cancel { reason, details })) => {
8490
*reason == ActivityCancelReason::TimedOut as i32
91+
|| details.as_ref().is_some_and(|d| d.is_timed_out)
8592
}
8693
_ => false,
8794
}
8895
}
96+
97+
pub fn primary_reason_to_cancellation_details(
98+
reason: ActivityCancelReason,
99+
) -> ActivityCancellationDetails {
100+
ActivityCancellationDetails {
101+
is_not_found: reason == ActivityCancelReason::NotFound,
102+
is_cancelled: reason == ActivityCancelReason::Cancelled,
103+
is_paused: reason == ActivityCancelReason::Paused,
104+
is_timed_out: reason == ActivityCancelReason::TimedOut,
105+
is_worker_shutdown: reason == ActivityCancelReason::WorkerShutdown,
106+
}
107+
}
89108
}
90109

91110
impl Display for ActivityTaskCompletion {

0 commit comments

Comments
 (0)