Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/worker/activities/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ impl LocalActivityManager {
Some(NextPendingLAAction::Autocomplete(
self.complete(&task_token, resolution.result),
))

} else {
// This timeout is for a no-longer-tracked activity, so, whatever
None
Expand Down
91 changes: 90 additions & 1 deletion tests/integ_tests/workflow_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use temporal_sdk::{
interceptors::{FailOnNondeterminismInterceptor, WorkerInterceptor},
};
use temporal_sdk_core::replay::HistoryForReplay;
use temporal_sdk_core_protos::temporal::api::failure::v1::failure::FailureInfo::TimeoutFailureInfo;
use temporal_sdk_core_protos::temporal::api::history::v1::history_event;
use temporal_sdk_core_protos::{
TestHistoryBuilder,
coresdk::{
Expand All @@ -26,14 +28,16 @@ use temporal_sdk_core_protos::{
},
temporal::api::{
common::v1::RetryPolicy,
enums::v1::{TimeoutType, UpdateWorkflowExecutionLifecycleStage},
enums::v1::{EventType, TimeoutType, UpdateWorkflowExecutionLifecycleStage},
update::v1::WaitPolicy,
},
};
use temporal_sdk_core_test_utils::{
CoreWfStarter, WorkflowHandleExt, history_from_proto_binary, init_core_replay_preloaded,
replay_sdk_worker, workflows::la_problem_workflow,
};
use tokio::join;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;

pub(crate) async fn one_local_activity_wf(ctx: WfContext) -> WorkflowResult<()> {
Expand Down Expand Up @@ -145,6 +149,91 @@ pub(crate) async fn local_act_fanout_wf(ctx: WfContext) -> WorkflowResult<()> {
Ok(().into())
}

#[tokio::test]
async fn local_activity_timeout_marker() {
let wf_name = "local_activity_timeout_marker";
let mut starter = CoreWfStarter::new(wf_name);
// starter.worker_config.graceful_shutdown_period(Duration::from_millis(1));
let mut worker = starter.worker().await;
static ACTS_STARTED: Semaphore = Semaphore::const_new(0);

worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
let local_activity = ctx.local_activity(LocalActivityOptions {
schedule_to_close_timeout: Some(Duration::from_millis(10)),
start_to_close_timeout: Some(Duration::from_millis(500)),
activity_type: "stop_activity".to_string(),
input: "hi!".as_json_payload().expect("serializes fine"),
..Default::default()
});
local_activity.await;
Ok(().into())
});

worker.register_activity(
"stop_activity",
|_ctx: ActContext, _: String| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
ACTS_STARTED.add_permits(1);
Result::<(), _>::Err(anyhow!("Oh no I failed!").into())
},
);

let run_id = worker
.submit_wf(
wf_name.to_owned(),
wf_name.to_owned(),
vec![],
WorkflowOptions {
execution_timeout: Some(Duration::from_secs(1)),
..Default::default()
},
)
.await
.unwrap();

let handle = worker.inner_mut().shutdown_handle();
let shutdowner = async {
let _ = ACTS_STARTED.acquire().await;
handle();
tokio::time::sleep(Duration::from_millis(1500)).await;
};
let runner = async {
worker.run_until_done().await.unwrap();
};
join!(shutdowner, runner);

let client = starter.get_client().await;
let history = client
.get_workflow_execution_history(wf_name.to_owned(), Some(run_id), vec![])
.await
.unwrap()
.history
.unwrap();
println!("{:#?}", history.events);
let marker = history
.events
.iter()
.find(|he| he.event_type() == EventType::MarkerRecorded)
.expect("expected marker recorded event");

println!("marker: {:?}", marker);

if let Some(history_event::Attributes::MarkerRecordedEventAttributes(attr)) =
marker.clone().attributes
{
let failure = attr.failure.unwrap().failure_info.unwrap();
match failure {
TimeoutFailureInfo(failure) => {
assert_eq!(failure.timeout_type(), TimeoutType::ScheduleToClose);
}
_ => panic!("Expected a timeout failure"),
}
} else {
unreachable!("We already assert MarkerRecorded event type");
}
panic!("fail so i can see prints on pass");
}

#[tokio::test]
async fn local_act_fanout() {
let wf_name = "local_act_fanout";
Expand Down
Loading