Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,17 @@ message InitializeWorkflow {
temporal.api.common.v1.SearchAttributes search_attributes = 22;
// When the workflow execution started event was first written
google.protobuf.Timestamp start_time = 23;
// Contains information about the root workflow execution. It is possible for the namespace to
// be different than this workflow if using OSS and cross-namespace children, but this
// information is not retained. Users should take care to track it by other means in such
// situations.
//
// The root workflow execution is defined as follows:
// 1. A workflow without parent workflow is its own root workflow.
// 2. A workflow that has a parent workflow has the same root workflow as its parent workflow.
//
// See field in WorkflowExecutionStarted for more detail.
temporal.api.common.v1.WorkflowExecution root_workflow = 24;
}

// Notify a workflow that a timer has fired
Expand Down
1 change: 1 addition & 0 deletions sdk-core-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ pub mod coresdk {
memo: attrs.memo,
search_attributes: attrs.search_attributes,
start_time: Some(start_time),
root_workflow: attrs.root_workflow_execution,
}
}
}
Expand Down
20 changes: 9 additions & 11 deletions sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl WorkflowHalf {
&self,
common: &CommonWorker,
shutdown_token: CancellationToken,
activation: WorkflowActivation,
mut activation: WorkflowActivation,
completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
) -> Result<
Option<
Expand All @@ -403,8 +403,8 @@ impl WorkflowHalf {

// If the activation is to init a workflow, create a new workflow driver for it,
// using the function associated with that workflow id
if let Some(sw) = activation.jobs.iter().find_map(|j| match j.variant {
Some(Variant::InitializeWorkflow(ref sw)) => Some(sw),
if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
_ => None,
}) {
let workflow_type = &sw.workflow_type;
Expand All @@ -425,9 +425,7 @@ impl WorkflowHalf {
let (wff, activations) = wf_function.start_workflow(
common.worker.get_config().namespace.clone(),
common.task_queue.clone(),
workflow_type,
// NOTE: Don't clone args if this gets ported to be a non-test rust worker
sw.arguments.clone(),
std::mem::take(sw),
completions_tx.clone(),
);
let jh = tokio::spawn(async move {
Expand Down Expand Up @@ -459,11 +457,11 @@ impl WorkflowHalf {
.send(activation)
.expect("Workflow should exist if we're sending it an activation");
} else {
// When we failed to start a workflow, we never inserted it into the cache.
// But core sends us a `RemoveFromCache` job when we mark the StartWorkflow workflow activation
// as a failure, which we need to complete.
// Other SDKs add the workflow to the cache even when the workflow type is unknown/not found.
// To circumvent this, we simply mark any RemoveFromCache job for workflows that are not in the cache as complete.
// When we failed to start a workflow, we never inserted it into the cache. But core
// sends us a `RemoveFromCache` job when we mark the StartWorkflow workflow activation
// as a failure, which we need to complete. Other SDKs add the workflow to the cache
// even when the workflow type is unknown/not found. To circumvent this, we simply mark
// any RemoveFromCache job for workflows that are not in the cache as complete.
if activation.jobs.len() == 1
&& matches!(
activation.jobs.first().map(|j| &j.variant),
Expand Down
28 changes: 22 additions & 6 deletions sdk/src/workflow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use temporal_sdk_core_protos::{
child_workflow::ChildWorkflowResult,
common::NamespacedWorkflowExecution,
nexus::NexusOperationResult,
workflow_activation::resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
workflow_activation::{
InitializeWorkflow,
resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
},
workflow_commands::{
CancelChildWorkflowExecution, ModifyWorkflowProperties,
RequestCancelExternalWorkflowExecution, SetPatchMarker,
Expand All @@ -54,7 +57,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
pub struct WfContext {
namespace: String,
task_queue: String,
args: Arc<Vec<Payload>>,
inital_information: Arc<InitializeWorkflow>,

chan: Sender<RustWfCmd>,
am_cancelled: watch::Receiver<Option<String>>,
Expand All @@ -71,7 +74,7 @@ impl WfContext {
pub(super) fn new(
namespace: String,
task_queue: String,
args: Vec<Payload>,
init_workflow_job: InitializeWorkflow,
am_cancelled: watch::Receiver<Option<String>>,
) -> (Self, Receiver<RustWfCmd>) {
// The receiving side is non-async
Expand All @@ -80,10 +83,17 @@ impl WfContext {
Self {
namespace,
task_queue,
args: Arc::new(args),
shared: Arc::new(RwLock::new(WfContextSharedData {
random_seed: init_workflow_job.randomness_seed,
search_attributes: init_workflow_job
.search_attributes
.clone()
.unwrap_or_default(),
..Default::default()
})),
inital_information: Arc::new(init_workflow_job),
chan,
am_cancelled,
shared: Arc::new(RwLock::new(Default::default())),
seq_nums: Arc::new(RwLock::new(WfCtxProtectedDat {
next_timer_sequence_number: 1,
next_activity_sequence_number: 1,
Expand All @@ -109,7 +119,7 @@ impl WfContext {

/// Get the arguments provided to the workflow upon execution start
pub fn get_args(&self) -> &[Payload] {
self.args.as_slice()
self.inital_information.arguments.as_slice()
}

/// Return the current time according to the workflow (which is not wall-clock time).
Expand Down Expand Up @@ -138,6 +148,12 @@ impl WfContext {
self.shared.read().random_seed
}

/// Return various information that the workflow was initialized with. Will eventually become
/// a proper non-proto workflow info struct.
pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
&self.inital_information
}

/// A future that resolves if/when the workflow is cancelled, with the user provided cause
pub async fn cancelled(&self) -> String {
if let Some(s) = self.am_cancelled.borrow().as_ref() {
Expand Down
33 changes: 10 additions & 23 deletions sdk/src/workflow_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::{
use temporal_sdk_core_protos::{
coresdk::{
workflow_activation::{
FireTimer, NotifyHasPatch, ResolveActivity, ResolveChildWorkflowExecution,
ResolveChildWorkflowExecutionStart, WorkflowActivation, WorkflowActivationJob,
workflow_activation_job::Variant,
FireTimer, InitializeWorkflow, NotifyHasPatch, ResolveActivity,
ResolveChildWorkflowExecution, ResolveChildWorkflowExecutionStart, WorkflowActivation,
WorkflowActivationJob, workflow_activation_job::Variant,
},
workflow_commands::{
CancelChildWorkflowExecution, CancelSignalWorkflow, CancelTimer,
Expand All @@ -44,26 +44,25 @@ use tracing::Instrument;
impl WorkflowFunction {
/// Start a workflow function, returning a future that will resolve when the workflow does,
/// and a channel that can be used to send it activations.
#[doc(hidden)]
pub fn start_workflow(
pub(crate) fn start_workflow(
&self,
namespace: String,
task_queue: String,
workflow_type: &str,
args: Vec<Payload>,
init_workflow_job: InitializeWorkflow,
outgoing_completions: UnboundedSender<WorkflowActivationCompletion>,
) -> (
impl Future<Output = WorkflowResult<Payload>> + use<>,
UnboundedSender<WorkflowActivation>,
) {
let (cancel_tx, cancel_rx) = watch::channel(None);
let (wf_context, cmd_receiver) = WfContext::new(namespace, task_queue, args, cancel_rx);
let (tx, incoming_activations) = unbounded_channel();
let span = info_span!(
"RunWorkflow",
"otel.name" = format!("RunWorkflow:{}", workflow_type),
"otel.name" = format!("RunWorkflow:{}", &init_workflow_job.workflow_type),
"otel.kind" = "server"
);
let (wf_context, cmd_receiver) =
WfContext::new(namespace, task_queue, init_workflow_job, cancel_rx);
let (tx, incoming_activations) = unbounded_channel();
let inner_fut = (self.wf_func)(wf_context.clone()).instrument(span);
(
WorkflowFuture {
Expand Down Expand Up @@ -346,7 +345,7 @@ impl Future for WorkflowFuture {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
'activations: loop {
// WF must always receive an activation first before responding with commands
let mut activation = match self.incoming_activations.poll_recv(cx) {
let activation = match self.incoming_activations.poll_recv(cx) {
Poll::Ready(a) => match a {
Some(act) => act,
None => {
Expand Down Expand Up @@ -374,18 +373,6 @@ impl Future for WorkflowFuture {

let mut die_of_eviction_when_done = false;
let mut activation_cmds = vec![];
// Assign initial state from start workflow job
if let Some(start_info) = activation.jobs.iter_mut().find_map(|j| {
if let Some(Variant::InitializeWorkflow(s)) = j.variant.as_mut() {
Some(s)
} else {
None
}
}) {
let mut wlock = self.wf_ctx.shared.write();
wlock.random_seed = start_info.randomness_seed;
wlock.search_attributes = start_info.search_attributes.take().unwrap_or_default();
};
// Lame hack to avoid hitting "unregistered" update handlers in a situation where
// the history has no commands until an update is accepted. Will go away w/ SDK redesign
if activation
Expand Down
14 changes: 13 additions & 1 deletion tests/integ_tests/workflow_tests/child_workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ use tokio::sync::Barrier;
static PARENT_WF_TYPE: &str = "parent_wf";
static CHILD_WF_TYPE: &str = "child_wf";

async fn child_wf(_ctx: WfContext) -> WorkflowResult<()> {
async fn child_wf(ctx: WfContext) -> WorkflowResult<()> {
assert_eq!(
ctx.workflow_initial_info()
.parent_workflow_info
.as_ref()
.unwrap()
.workflow_id,
ctx.workflow_initial_info()
.root_workflow
.as_ref()
.unwrap()
.workflow_id
);
Ok(().into())
}

Expand Down
Loading