Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,3 +1241,163 @@ async fn pass_activity_summary_to_metadata() {
.unwrap();
worker.run_until_done().await.unwrap();
}

#[tokio::test]
async fn heartbeat_response_can_be_paused() {
let mut mock_client = mock_workflow_client();
// First heartbeat returns pause only
mock_client
.expect_record_activity_heartbeat()
.times(1)
.returning(|_, _| {
Ok(RecordActivityTaskHeartbeatResponse {
cancel_requested: false,
activity_paused: true,
})
});
// Second heartbeat returns cancel only
mock_client
.expect_record_activity_heartbeat()
.times(1)
.returning(|_, _| {
Ok(RecordActivityTaskHeartbeatResponse {
cancel_requested: true,
activity_paused: false,
})
});
// Third heartbeat returns both
mock_client
.expect_record_activity_heartbeat()
.times(1)
.returning(|_, _| {
Ok(RecordActivityTaskHeartbeatResponse {
cancel_requested: true,
activity_paused: true,
})
});
mock_client
.expect_cancel_activity_task()
.times(3)
.returning(|_, _| Ok(RespondActivityTaskCanceledResponse::default()));

let core = mock_worker(MocksHolder::from_client_with_activities(
mock_client,
[
PollActivityTaskQueueResponse {
task_token: vec![1],
activity_id: "act1".to_string(),
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
..Default::default()
}
.into(),
PollActivityTaskQueueResponse {
task_token: vec![2],
activity_id: "act2".to_string(),
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
..Default::default()
}
.into(),
PollActivityTaskQueueResponse {
task_token: vec![3],
activity_id: "act3".to_string(),
heartbeat_timeout: Some(prost_dur!(from_millis(1))),
..Default::default()
}
.into(),
],
));

// The general testing pattern for each of these cases is:
// 1. Poll for activity task
// 2. Record activity heartbeat, get mocked heartbeat response
// 3. Sleep for 10ms (waiting for heartbeat request to be flushed)
// (i.e. sleep enough for the heartbeat flush interval to have elapsed)
// 4. Poll for activity task.
// We expect a cancellation activity task as they are prioritized (i.e. ordered before)
// regular activity tasks.
// 5. Assert that the received activity task is indeed a cancellation, with the reason
// we expect.
// 6. Complete the activity with a cancellation result.
//
// Repeat for subsequent test case(s).

// Test pause only
let act = core.poll_activity_task().await.unwrap();
core.record_activity_heartbeat(ActivityHeartbeat {
task_token: act.task_token.clone(),
details: vec![vec![1_u8, 2, 3].into()],
});
sleep(Duration::from_millis(10)).await;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this I think. If it is necessary, see if we can find a way to avoid the sleep.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for other locations

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I basically adapted this from heartbeats_report_cancels_only_once, my guess was that the sleep was needed to wait for some heartbeat flush timeout to elapse. (if this is the case) I suppose we only need to flush this once for all heartbeats, instead of per heartbeat, now that I think about this more.

Does the "general testing pattern" comment in the test line up accurately?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaaah, right. Yeah... I suppose that's a bit unavoidable here. The comment makes sense.

let act = core.poll_activity_task().await.unwrap();
assert_matches!(
&act,
ActivityTask {
task_token,
variant: Some(activity_task::Variant::Cancel(Cancel { reason })),
..
} => {
task_token == &vec![1] &&
*reason == ActivityCancelReason::Paused as i32
}
);
core.complete_activity_task(ActivityTaskCompletion {
task_token: act.task_token,
result: Some(ActivityExecutionResult::cancel_from_details(None)),
})
.await
.unwrap();

// Test cancel only
let act = core.poll_activity_task().await.unwrap();
core.record_activity_heartbeat(ActivityHeartbeat {
task_token: act.task_token.clone(),
details: vec![vec![1_u8, 2, 3].into()],
});
sleep(Duration::from_millis(10)).await;
let act = core.poll_activity_task().await.unwrap();
assert_matches!(
&act,
ActivityTask {
task_token,
variant: Some(activity_task::Variant::Cancel(Cancel { reason })),
..
} => {
task_token == &vec![2] &&
*reason == ActivityCancelReason::Cancelled as i32
}
);
core.complete_activity_task(ActivityTaskCompletion {
task_token: act.task_token,
result: Some(ActivityExecutionResult::cancel_from_details(None)),
})
.await
.unwrap();

// Test both pause and cancel (should prioritize cancel)
let act = core.poll_activity_task().await.unwrap();
core.record_activity_heartbeat(ActivityHeartbeat {
task_token: act.task_token.clone(),
details: vec![vec![1_u8, 2, 3].into()],
});
sleep(Duration::from_millis(10)).await;
let act = core.poll_activity_task().await.unwrap();
assert_matches!(
&act,
ActivityTask {
task_token,
variant: Some(activity_task::Variant::Cancel(Cancel { reason })),
..
} => {
task_token == &vec![3] &&
*reason == ActivityCancelReason::Cancelled as i32
}
);
core.complete_activity_task(ActivityTaskCompletion {
task_token: act.task_token,
result: Some(ActivityExecutionResult::cancel_from_details(None)),
})
.await
.unwrap();

core.drain_activity_poller_and_shutdown().await;
}
7 changes: 4 additions & 3 deletions core/src/worker/activities/activity_heartbeat_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,13 @@ impl ActivityHeartbeatManager {
.record_activity_heartbeat(tt.clone(), details.into_payloads())
.await
{
Ok(RecordActivityTaskHeartbeatResponse { cancel_requested, activity_paused: _ }) => {
if cancel_requested {
Ok(RecordActivityTaskHeartbeatResponse { cancel_requested, activity_paused }) => {
if cancel_requested || activity_paused {
let reason = if cancel_requested { ActivityCancelReason::Cancelled } else { ActivityCancelReason::Paused};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cargo fmt may be needed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fmt is checked by CI, buuuuut:

This makes me think that ActivityCancelReason needs to be a repeated field in the Cancel proto, since it is possible to have cancel/paused happen at the same time. Assuming that's actually a thing server can produce, we'll need to reflect that.

Copy link
Copy Markdown
Contributor Author

@THardy98 THardy98 Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah just verified, both fields can be set to true:
https://github.com/temporalio/temporal/blob/main/service/history/api/recordactivitytaskheartbeat/api.go#L85-L130

I don't mind changing it to repeated, but I'm unsure what the benefit is. I feel like it just makes lang SDKs pick which reason is relevant when we could be doing that in core, giving us consistent behaviour by default.

Copy link
Copy Markdown
Contributor

@cretz cretz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fmt is checked by CI:

Interesting that https://github.com/temporalio/sdk-core/actions/runs/14725400139/job/41326870507?pr=909 passes and let reason = if cancel_requested { ActivityCancelReason::Cancelled } else { ActivityCancelReason::Paused}; is accepted considering the inconsistency on the trailing space before end brace.

As for repeated, yeah, usually I would say show them all and let lang figure out how to handle, but I can also see that Core needs to pick an "interruption reason" winner. I may bring this up internally.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't think it's about "picking", per se, we may need to expose both all the way to the user

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, after discussion, it is the case that multiple booleans on this can be true (again there is a activity_reset boolean coming later). So let's go ahead and prepare for a collection of reasons here. And maybe even a "primary reason" so that each SDK doesn't have to pick the winning reason in cases where only a single one is supported.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also after discussion, it should be known that "pause" can flip between true and false, so we may want to memoize its first true and not let it be set back to false, or maybe we are ok with the flipping.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formatting does indeed seem weird, but don't know why CI isn't complaining.

cancels_tx
.send(PendingActivityCancel::new(
tt.clone(),
ActivityCancelReason::Cancelled,
reason,
))
.expect(
"Receive half of heartbeat cancels not blocked",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ enum ActivityCancelReason {
TIMED_OUT = 2;
// Core is shutting down and the graceful timeout has elapsed
WORKER_SHUTDOWN = 3;
// Activity was paused
PAUSED = 4;
}


Loading