Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 163 additions & 2 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,8 +1105,9 @@ async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
assert_matches!(
cancel.variant,
Some(activity_task::Variant::Cancel(Cancel {
reason: r
})) if r == ActivityCancelReason::WorkerShutdown as i32
reason,
details
})) if reason == ActivityCancelReason::WorkerShutdown as i32 && details.as_ref().is_some_and(|d| d.is_worker_shutdown)
);
seen_tts.insert(cancel.task_token);
}
Expand Down Expand Up @@ -1241,3 +1242,163 @@ async fn pass_activity_summary_to_metadata() {
.unwrap();
worker.run_until_done().await.unwrap();
}

#[tokio::test]
async fn heartbeat_response_can_be_paused() {
let mut mock_client = mock_workflow_client();
// First heartbeat returns pause only
mock_client
.expect_record_activity_heartbeat()
.times(1)
.returning(|_, _| {
Ok(RecordActivityTaskHeartbeatResponse {
cancel_requested: false,
activity_paused: true,
})
});
// Second heartbeat returns cancel only
mock_client
.expect_record_activity_heartbeat()
.times(1)
.returning(|_, _| {
Ok(RecordActivityTaskHeartbeatResponse {
cancel_requested: true,
activity_paused: false,
})
});
// Third heartbeat returns both
mock_client
.expect_record_activity_heartbeat()
.times(1)
.returning(|_, _| {
Ok(RecordActivityTaskHeartbeatResponse {
cancel_requested: true,
activity_paused: true,
})
});
mock_client
.expect_cancel_activity_task()
.times(3)
.returning(|_, _| Ok(RespondActivityTaskCanceledResponse::default()));

let core = mock_worker(MocksHolder::from_client_with_activities(
mock_client,
[
PollActivityTaskQueueResponse {
task_token: vec![1],
activity_id: "act1".to_string(),
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
..Default::default()
}
.into(),
PollActivityTaskQueueResponse {
task_token: vec![2],
activity_id: "act2".to_string(),
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
..Default::default()
}
.into(),
PollActivityTaskQueueResponse {
task_token: vec![3],
activity_id: "act3".to_string(),
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
..Default::default()
}
.into(),
],
));

// The general testing pattern for each of these cases is:
// 1. Poll for activity task
// 2. Record activity heartbeat, get mocked heartbeat response
// 3. Sleep for 10ms (waiting for heartbeat request to be flushed)
// (i.e. sleep enough for the heartbeat flush interval to have elapsed)
// 4. Poll for activity task.
// We expect a cancellation activity task as they are prioritized (i.e. ordered before)
// regular activity tasks.
// 5. Assert that the received activity task is indeed a cancellation, with the reason
// and details we expect.
// 6. Complete the activity with a cancellation result.
//
// Repeat for subsequent test case(s).

// Test pause only
let act = core.poll_activity_task().await.unwrap();
core.record_activity_heartbeat(ActivityHeartbeat {
task_token: act.task_token.clone(),
details: vec![vec![1_u8, 2, 3].into()],
});
sleep(Duration::from_millis(10)).await;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this I think. If it is necessary, see if we can find a way to avoid the sleep.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for other locations

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I basically adapted this from heartbeats_report_cancels_only_once, my guess was that the sleep was needed to wait for some heartbeat flush timeout to elapse. (if this is the case) I suppose we only need to flush this once for all heartbeats, instead of per heartbeat, now that I think about this more.

Does the "general testing pattern" comment in the test line up accurately?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaaah, right. Yeah... I suppose that's a bit unavoidable here. The comment makes sense.

let act = core.poll_activity_task().await.unwrap();
assert_matches!(
&act,
ActivityTask {
task_token,
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
} if
task_token == &vec![1] &&
*reason == ActivityCancelReason::Paused as i32 &&
details.as_ref().is_some_and(|d| d.is_paused) &&
details.as_ref().is_some_and(|d| !d.is_cancelled)
);
core.complete_activity_task(ActivityTaskCompletion {
task_token: act.task_token,
result: Some(ActivityExecutionResult::cancel_from_details(None)),
})
.await
.unwrap();

// Test cancel only
let act = core.poll_activity_task().await.unwrap();
core.record_activity_heartbeat(ActivityHeartbeat {
task_token: act.task_token.clone(),
details: vec![vec![1_u8, 2, 3].into()],
});
sleep(Duration::from_millis(10)).await;
let act = core.poll_activity_task().await.unwrap();
assert_matches!(
&act,
ActivityTask {
task_token,
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
} if
task_token == &vec![2] &&
*reason == ActivityCancelReason::Cancelled as i32 &&
details.as_ref().is_some_and(|d| !d.is_paused) &&
details.as_ref().is_some_and(|d| d.is_cancelled)
);
core.complete_activity_task(ActivityTaskCompletion {
task_token: act.task_token,
result: Some(ActivityExecutionResult::cancel_from_details(None)),
})
.await
.unwrap();

// Test both pause and cancel (should prioritize cancel)
let act = core.poll_activity_task().await.unwrap();
core.record_activity_heartbeat(ActivityHeartbeat {
task_token: act.task_token.clone(),
details: vec![vec![1_u8, 2, 3].into()],
});
sleep(Duration::from_millis(10)).await;
let act = core.poll_activity_task().await.unwrap();
assert_matches!(
&act,
ActivityTask {
task_token,
variant: Some(activity_task::Variant::Cancel(Cancel { reason, details })),
} if
task_token == &vec![3] &&
*reason == ActivityCancelReason::Cancelled as i32 &&
details.as_ref().is_some_and(|d| d.is_paused) &&
details.as_ref().is_some_and(|d| d.is_cancelled)
);
core.complete_activity_task(ActivityTaskCompletion {
task_token: act.task_token,
result: Some(ActivityExecutionResult::cancel_from_details(None)),
})
.await
.unwrap();

core.drain_activity_poller_and_shutdown().await;
}
36 changes: 25 additions & 11 deletions core/src/worker/activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use temporal_sdk_core_protos::{
coresdk::{
ActivityHeartbeat, ActivitySlotInfo,
activity_result::{self as ar, activity_execution_result as aer},
activity_task::{ActivityCancelReason, ActivityTask},
activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask},
},
temporal::api::{
failure::v1::{ApplicationFailureInfo, CanceledFailureInfo, Failure, failure::FailureInfo},
Expand All @@ -65,16 +65,19 @@ type OutstandingActMap = Arc<DashMap<TaskToken, RemoteInFlightActInfo>>;
struct PendingActivityCancel {
task_token: TaskToken,
reason: ActivityCancelReason,
/// Set true if we should assume the server has already forgotten about this activity
consider_not_found: bool,
details: ActivityCancellationDetails,
}

impl PendingActivityCancel {
fn new(task_token: TaskToken, reason: ActivityCancelReason) -> Self {
fn new(
task_token: TaskToken,
reason: ActivityCancelReason,
details: ActivityCancellationDetails,
) -> Self {
Self {
task_token,
reason,
consider_not_found: false,
details,
}
}
}
Expand Down Expand Up @@ -508,13 +511,14 @@ where
} else {
details.issued_cancel_to_lang = Some(next_pc.reason);
if next_pc.reason == ActivityCancelReason::NotFound
|| next_pc.consider_not_found
|| next_pc.details.is_not_found
{
details.known_not_found = true;
}
Some(Ok(ActivityTask::cancel_from_ids(
next_pc.task_token.0,
next_pc.reason,
next_pc.details,
)))
}
} else {
Expand Down Expand Up @@ -566,6 +570,9 @@ where
let _ = cancels_tx.send(PendingActivityCancel::new(
tt,
ActivityCancelReason::WorkerShutdown,
ActivityTask::primary_reason_to_cancellation_details(
ActivityCancelReason::WorkerShutdown,
),
));
} else {
// Fire off task to keep track of local timeouts. We do this so that
Expand Down Expand Up @@ -611,11 +618,15 @@ where
"Timing out activity due to elapsed local \
{timeout_type} timer"
);
let _ = cancel_tx.send(PendingActivityCancel {
task_token: tt,
reason: ActivityCancelReason::TimedOut,
consider_not_found: true,
});
let _ = cancel_tx.send(PendingActivityCancel::new(
tt,
ActivityCancelReason::TimedOut,
ActivityCancellationDetails {
is_not_found: true,
is_timed_out: true,
..Default::default()
},
));
}));
outstanding_info.timeout_resetter = resetter;
}
Expand All @@ -639,6 +650,9 @@ where
let _ = self.cancels_tx.send(PendingActivityCancel::new(
mapref.key().clone(),
ActivityCancelReason::WorkerShutdown,
ActivityTask::primary_reason_to_cancellation_details(
ActivityCancelReason::WorkerShutdown,
),
));
}
}
Expand Down
19 changes: 15 additions & 4 deletions core/src/worker/activities/activity_heartbeat_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use std::{
time::{Duration, Instant},
};
use temporal_sdk_core_protos::{
coresdk::{ActivityHeartbeat, IntoPayloadsExt, activity_task::ActivityCancelReason},
coresdk::{
ActivityHeartbeat, IntoPayloadsExt,
activity_task::{ActivityCancelReason, ActivityCancellationDetails, ActivityTask},
},
temporal::api::{
common::v1::Payload, workflowservice::v1::RecordActivityTaskHeartbeatResponse,
},
Expand Down Expand Up @@ -142,12 +145,19 @@ impl ActivityHeartbeatManager {
.record_activity_heartbeat(tt.clone(), details.into_payloads())
.await
{
Ok(RecordActivityTaskHeartbeatResponse { cancel_requested, activity_paused: _ }) => {
if cancel_requested {
Ok(RecordActivityTaskHeartbeatResponse { cancel_requested, activity_paused }) => {
if cancel_requested || activity_paused {
// Prioritize Cancel over Pause
let reason = if cancel_requested { ActivityCancelReason::Cancelled } else { ActivityCancelReason::Paused};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cargo fmt may be needed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fmt is checked by CI, buuuuut:

This makes me think that ActivityCancelReason needs to be a repeated field in the Cancel proto, since it is possible to have cancel/paused happen at the same time. Assuming that's actually a thing server can produce, we'll need to reflect that.

Copy link
Copy Markdown
Contributor Author

@THardy98 THardy98 Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah just verified, both fields can be set to true:
https://github.com/temporalio/temporal/blob/main/service/history/api/recordactivitytaskheartbeat/api.go#L85-L130

I don't mind changing it to repeated, but I'm unsure what the benefit is. I feel like it just makes lang SDKs pick which reason is relevant when we could be doing that in core, giving us consistent behaviour by default.

Copy link
Copy Markdown
Contributor

@cretz cretz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fmt is checked by CI:

Interesting that https://github.com/temporalio/sdk-core/actions/runs/14725400139/job/41326870507?pr=909 passes and let reason = if cancel_requested { ActivityCancelReason::Cancelled } else { ActivityCancelReason::Paused}; is accepted considering the inconsistency on the trailing space before end brace.

As for repeated, yeah, usually I would say show them all and let lang figure out how to handle, but I can also see that Core needs to pick an "interruption reason" winner. I may bring this up internally.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't think it's about "picking", per se, we may need to expose both all the way to the user

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, after discussion, it is the case that multiple booleans on this can be true (again there is a activity_reset boolean coming later). So let's go ahead and prepare for a collection of reasons here. And maybe even a "primary reason" so that each SDK doesn't have to pick the winning reason in cases where only a single one is supported.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also after discussion, it should be known that "pause" can flip between true and false, so we may want to memoize its first true and not let it be set back to false, or maybe we are ok with the flipping.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formatting does indeed seem weird, but don't know why CI isn't complaining.

cancels_tx
.send(PendingActivityCancel::new(
tt.clone(),
ActivityCancelReason::Cancelled,
reason,
ActivityCancellationDetails {
is_cancelled: cancel_requested,
is_paused: activity_paused,
..Default::default()
}
))
.expect(
"Receive half of heartbeat cancels not blocked",
Expand All @@ -164,6 +174,7 @@ impl ActivityHeartbeatManager {
.send(PendingActivityCancel::new(
tt.clone(),
ActivityCancelReason::NotFound,
ActivityTask::primary_reason_to_cancellation_details(ActivityCancelReason::NotFound)
))
.expect("Receive half of heartbeat cancels not blocked");
}
Expand Down
28 changes: 15 additions & 13 deletions core/src/worker/activities/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use temporal_sdk_core_protos::{
coresdk::{
LocalActivitySlotInfo,
activity_result::{Cancellation, Failure as ActFail, Success},
activity_task::{ActivityCancelReason, ActivityTask, Cancel, Start, activity_task},
activity_task::{ActivityCancelReason, ActivityTask, Start, activity_task},
},
temporal::api::{
common::v1::WorkflowExecution,
Expand Down Expand Up @@ -629,12 +629,13 @@ impl LocalActivityManager {
};
// We want to generate a cancel task if the reason for failure was a timeout.
let task = if is_timeout {
Some(ActivityTask {
task_token: task_token.clone().0,
variant: Some(activity_task::Variant::Cancel(Cancel {
reason: ActivityCancelReason::TimedOut as i32,
})),
})
Some(ActivityTask::cancel_from_ids(
task_token.clone().0,
ActivityCancelReason::TimedOut,
ActivityTask::primary_reason_to_cancellation_details(
ActivityCancelReason::TimedOut,
),
))
} else {
None
};
Expand Down Expand Up @@ -786,12 +787,13 @@ impl LocalActivityManager {
}

self.cancels_req_tx
.send(CancelOrTimeout::Cancel(ActivityTask {
task_token: lai.task_token.0.clone(),
variant: Some(activity_task::Variant::Cancel(Cancel {
reason: ActivityCancelReason::Cancelled as i32,
})),
}))
.send(CancelOrTimeout::Cancel(ActivityTask::cancel_from_ids(
lai.task_token.0.clone(),
ActivityCancelReason::Cancelled,
ActivityTask::primary_reason_to_cancellation_details(
ActivityCancelReason::Cancelled,
),
)))
.expect("Receive half of LA cancel channel cannot be dropped");
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,18 @@ message Start {

// Attempt to cancel a running activity
message Cancel {
Copy link
Copy Markdown
Contributor

@cretz cretz Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, say I heartbeat and get paused as true, so we send a cancel, then I heartbeat again and paused is false, then I heartbeat again and paused is true again. Do we send multiple cancellations or does Core only send the first cancellation reason it sees? This can also happen if say, paused becomes true, then cancel becomes true or then worker shutdown occurs.

My concern if Core does only apply the first cancellation reason, is that we can't get updates to these is_ details that may change. My concern if Core does not only apply the first but instead sends a cancellation for each, is that langs may be unprepared for multiple cancellations (not sure we ever tested swallowed cancellation followed by worker shutdown that may trigger a second).

This question is a bit blocking for me because I'm trying to understand how we model updating activity state as it comes back from the server/Core. I wonder if we should separate this cancel that only happens once from "activity state updated" we may send more frequently (and how we can make sure langs account for it).

Copy link
Copy Markdown
Contributor Author

@THardy98 THardy98 Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's the former, as in, only apply first cancellation.
But I should take another look.

Suppose the is_ details do change from a heartbeat response, which is currently limited to cancel/pause flags. From what I can tell, worst-case scenario, we would have been able to run an activity that we have now cancelled. In which case, wouldn't we retry?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it does not make sense for core to send multiple cancels. I don't think it should or will ever do this right now... (I would have to double-check the shutdown case).

It adds a bunch of complication, and I'm not sure what benefit you'd get even if you did it. We can always, if it turns out to be needed for some reason, add some kind of currentActivityCancellationState() function to the Core API. But, I hope not.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it does not make sense for core to send multiple cancels

Completely agree on sending multiple cancels. But langs now need to make it clear that "is_paused" does not mean "is paused" (which it very well may be) but rather "pausing was reason for cancellation request".

// Primary cancellation reason
ActivityCancelReason reason = 1;
// Activity cancellation details, surfaces all cancellation reasons.
ActivityCancellationDetails details = 2;
}

message ActivityCancellationDetails {
bool is_not_found = 1;
bool is_cancelled = 2;
bool is_paused = 3;
bool is_timed_out = 4;
bool is_worker_shutdown = 5;
}

enum ActivityCancelReason {
Expand All @@ -79,6 +90,8 @@ enum ActivityCancelReason {
TIMED_OUT = 2;
// Core is shutting down and the graceful timeout has elapsed
WORKER_SHUTDOWN = 3;
// Activity was paused
PAUSED = 4;
}


Loading
Loading