Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ document as your quick reference when submitting pull requests.
- `tests/` – integration, heavy, and manual tests
- `arch_docs/` – architectural design documents
- Contributor guide: `README.md`
- `target/` - This contains compiled files. You never need to look in here.

## Repo Specific Utilities

Expand Down
8 changes: 2 additions & 6 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2528,7 +2528,7 @@ async fn post_terminal_commands_are_retained_when_not_replaying() {
]);
_do_post_terminal_commands_test(
commands_sent_by_lang,
[ResponseType::ToTaskNum(1), ResponseType::AllHistory],
[ResponseType::ToTaskNum(1)],
expected_command_types_emitted,
t,
)
Expand Down Expand Up @@ -2602,14 +2602,14 @@ async fn _do_post_terminal_commands_test(

let act = core.poll_workflow_activation().await.unwrap();

core.initiate_shutdown();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
act.run_id,
commands_sent_by_lang,
))
.await
.unwrap();

core.initiate_shutdown();
let act = core.poll_workflow_activation().await;
assert_matches!(act.unwrap_err(), PollError::ShutDown);
core.shutdown().await;
Expand Down Expand Up @@ -2685,10 +2685,6 @@ async fn history_length_with_fail_and_timeout(
#[values(true, false)] use_cache: bool,
#[values(1, 2, 3)] history_responses_case: u8,
) {
if !use_cache && history_responses_case == 3 {
/* disabled for now because this keeps flaking*/
return;
}
let wfid = "fake_wf_id";
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
Expand Down
20 changes: 1 addition & 19 deletions core/src/ephemeral_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ fn get_free_port(bind_ip: &str) -> io::Result<u16> {
let addr = listen.local_addr()?;

// On Linux and some BSD variants, ephemeral ports are randomized, and may
// consequently repeat within a short time frame after the listenning end
// consequently repeat within a short time frame after the listening end
// has been closed. To avoid this, we make a connection to the port, then
// close that connection from the server's side (this is very important),
// which puts the connection in TIME_WAIT state for some time (by default,
Expand Down Expand Up @@ -622,30 +622,12 @@ fn remove_file_past_ttl(ttl: &Option<Duration>, dest: &PathBuf) -> Result<bool,
mod tests {
use super::{get_free_port, remove_file_past_ttl};
use std::{
collections::HashSet,
env::temp_dir,
fs::File,
net::{TcpListener, TcpStream},
time::{Duration, SystemTime},
};

#[test]
fn get_free_port_no_double() {
Comment on lines -632 to -633
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is unrealistic compared to the one below and provides no additional coverage. It could flake on Windows.

let host = "127.0.0.1";
let mut port_set = HashSet::new();

for _ in 0..2000 {
let port = get_free_port(host).unwrap();
assert!(
!port_set.contains(&port),
"Port {port} has been assigned more than once"
);

// Add port to the set
port_set.insert(port);
}
}

#[test]
fn get_free_port_can_bind_immediately() {
let host = "127.0.0.1";
Expand Down
17 changes: 13 additions & 4 deletions core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,10 +1068,19 @@ impl WorkerExt for Worker {
);
},
async {
assert_matches!(
self.poll_workflow_activation().await.unwrap_err(),
PollError::ShutDown
);
loop {
match self.poll_workflow_activation().await {
Err(PollError::ShutDown) => break,
Ok(a) if a.is_only_eviction() => {
self.complete_workflow_activation(WorkflowActivationCompletion::empty(
a.run_id,
))
.await
.unwrap();
}
o => panic!("Unexpected activation while draining: {o:?}"),
}
}
}
);
self.finalize_shutdown().await;
Expand Down
14 changes: 8 additions & 6 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use std::{
collections::VecDeque,
fmt::Debug,
future::Future,
mem,
ops::DerefMut,
rc::Rc,
result,
Expand Down Expand Up @@ -1100,8 +1099,7 @@ struct BufferedTasks {
/// current one has been processed).
query_only_tasks: VecDeque<PermittedWFT>,
/// These are query-only tasks for the *buffered* wft, if any. They will all be discarded if
/// a buffered wft is replaced before being handled. They move to `query_only_tasks` once the
/// buffered task is taken.
/// a buffered wft is replaced before being handled.
query_only_tasks_for_buffered: VecDeque<PermittedWFT>,
}

Expand Down Expand Up @@ -1136,9 +1134,13 @@ impl BufferedTasks {
if let Some(q) = self.query_only_tasks.pop_front() {
return Some(q);
}
if let Some(t) = self.wft.take() {
self.query_only_tasks = mem::take(&mut self.query_only_tasks_for_buffered);
return Some(t);
if self.wft.is_some() {
if let Some(q) = self.query_only_tasks_for_buffered.pop_front() {
return Some(q);
}
if let Some(t) = self.wft.take() {
return Some(t);
}
}
None
}
Expand Down
74 changes: 48 additions & 26 deletions sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ use temporal_sdk_core_protos::{
};
use tokio::{
sync::{
Notify,
mpsc::{UnboundedSender, unbounded_channel},
oneshot,
},
Expand Down Expand Up @@ -151,6 +152,7 @@ struct WorkflowHalf {
workflows: RefCell<HashMap<String, WorkflowData>>,
/// Maps workflow type to the function for executing workflow runs with that ID
workflow_fns: RefCell<HashMap<String, WorkflowFunction>>,
workflow_removed_from_map: Notify,
}
struct WorkflowData {
/// Channel used to send the workflow activations
Expand Down Expand Up @@ -180,6 +182,7 @@ impl Worker {
workflow_half: WorkflowHalf {
workflows: Default::default(),
workflow_fns: Default::default(),
workflow_removed_from_map: Default::default(),
},
activity_half: ActivityHalf {
activity_fns: Default::default(),
Expand Down Expand Up @@ -260,6 +263,7 @@ impl Worker {
join_handle.await??;
debug!(run_id=%run_id, "Removing workflow from cache");
wf_half.workflows.borrow_mut().remove(&run_id);
wf_half.workflow_removed_from_map.notify_one();
Ok(())
}
},
Expand Down Expand Up @@ -293,12 +297,15 @@ impl Worker {
if let Some(ref i) = common.worker_interceptor {
i.on_workflow_activation(&activation).await?;
}
if let Some(wf_fut) = wf_half.workflow_activation_handler(
common,
shutdown_token.clone(),
activation,
&completions_tx,
)? && wf_future_tx.send(wf_fut).is_err()
if let Some(wf_fut) = wf_half
.workflow_activation_handler(
common,
shutdown_token.clone(),
activation,
&completions_tx,
)
.await?
&& wf_future_tx.send(wf_fut).is_err()
{
panic!("Receive half of completion processor channel cannot be dropped");
}
Expand Down Expand Up @@ -384,7 +391,7 @@ impl Worker {

impl WorkflowHalf {
#[allow(clippy::type_complexity)]
fn workflow_activation_handler(
async fn workflow_activation_handler(
&self,
common: &CommonWorker,
shutdown_token: CancellationToken,
Expand All @@ -408,26 +415,29 @@ impl WorkflowHalf {
_ => None,
}) {
let workflow_type = &sw.workflow_type;
let wf_fns_borrow = self.workflow_fns.borrow();
let Some(wf_function) = wf_fns_borrow.get(workflow_type) else {
warn!("Workflow type {workflow_type} not found");
let (wff, activations) = {
let wf_fns_borrow = self.workflow_fns.borrow();

let Some(wf_function) = wf_fns_borrow.get(workflow_type) else {
warn!("Workflow type {workflow_type} not found");

completions_tx
.send(WorkflowActivationCompletion::fail(
run_id,
format!("Workflow type {workflow_type} not found").into(),
Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
))
.expect("Completion channel intact");
return Ok(None);
};

completions_tx
.send(WorkflowActivationCompletion::fail(
run_id,
format!("Workflow type {workflow_type} not found").into(),
Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
))
.expect("Completion channel intact");
return Ok(None);
wf_function.start_workflow(
common.worker.get_config().namespace.clone(),
common.task_queue.clone(),
std::mem::take(sw),
completions_tx.clone(),
)
};

let (wff, activations) = wf_function.start_workflow(
common.worker.get_config().namespace.clone(),
common.task_queue.clone(),
std::mem::take(sw),
completions_tx.clone(),
);
let jh = tokio::spawn(async move {
tokio::select! {
r = wff.fuse() => r,
Expand All @@ -442,6 +452,17 @@ impl WorkflowHalf {
join_handle: jh,
run_id: run_id.clone(),
});
loop {
// It's possible that we've got a new initialize workflow action before the last
// future for this run finished evicting, as a result of how futures might be
// interleaved. In that case, just wait until it's not in the map, which should be
// a matter of only a few `poll` calls.
if self.workflows.borrow_mut().contains_key(&run_id) {
self.workflow_removed_from_map.notified().await;
} else {
break;
}
}
self.workflows.borrow_mut().insert(
run_id.clone(),
WorkflowData {
Expand Down Expand Up @@ -474,7 +495,8 @@ impl WorkflowHalf {
return Ok(None);
}

// In all other cases, we want to panic as the runtime could be in an inconsistent state at this point.
// In all other cases, we want to error as the runtime could be in an inconsistent state
// at this point.
bail!(
"Got activation {:?} for unknown workflow {}",
activation,
Expand Down
34 changes: 10 additions & 24 deletions sdk/src/workflow_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl WorkflowFuture {
&mut self,
variant: Option<Variant>,
outgoing_cmds: &mut Vec<WorkflowCommand>,
) -> Result<bool, Error> {
) -> Result<(), Error> {
if let Some(v) = variant {
match v {
Variant::InitializeWorkflow(_) => {
Expand Down Expand Up @@ -326,17 +326,14 @@ impl WorkflowFuture {
))?
}
Variant::RemoveFromCache(_) => {
// TODO: Need to abort any user-spawned tasks, etc. See also cancel WF.
// How best to do this in executor agnostic way? Is that possible?
// -- tokio JoinSet does this in a nice way.
return Ok(true);
unreachable!("Cache removal should happen higher up");
}
}
} else {
bail!("Empty activation job variant");
}

Ok(false)
Ok(())
}
}

Expand Down Expand Up @@ -370,7 +367,6 @@ impl Future for WorkflowFuture {
.map(Into::into);
}

let mut die_of_eviction_when_done = false;
let mut activation_cmds = vec![];
// Lame hack to avoid hitting "unregistered" update handlers in a situation where
// the history has no commands until an update is accepted. Will go away w/ SDK redesign
Expand All @@ -391,19 +387,6 @@ impl Future for WorkflowFuture {
}
}

for WorkflowActivationJob { variant } in activation.jobs {
match self.handle_job(variant, &mut activation_cmds) {
Ok(true) => {
die_of_eviction_when_done = true;
}
Err(e) => {
self.fail_wft(run_id, e);
continue 'activations;
}
_ => (),
}
}

if is_only_eviction {
// No need to do anything with the workflow code in this case
self.outgoing_completions
Expand All @@ -412,6 +395,13 @@ impl Future for WorkflowFuture {
return Ok(WfExitValue::Evicted).into();
}

for WorkflowActivationJob { variant } in activation.jobs {
if let Err(e) = self.handle_job(variant, &mut activation_cmds) {
self.fail_wft(run_id, e);
continue 'activations;
}
}

// Drive update functions
self.update_futures = std::mem::take(&mut self.update_futures)
.into_iter()
Expand Down Expand Up @@ -450,10 +440,6 @@ impl Future for WorkflowFuture {

self.send_completion(run_id, activation_cmds);

if die_of_eviction_when_done {
return Ok(WfExitValue::Evicted).into();
}

// We don't actually return here, since we could be queried after finishing executing,
// and it allows us to rely on evictions for death and cache management
}
Expand Down
Loading