Skip to content

Commit 56a1e14

Browse files
authored
Pass through root workflow info (#890)
1 parent 99513c8 commit 56a1e14

6 files changed

Lines changed: 66 additions & 41 deletions

File tree

sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,17 @@ message InitializeWorkflow {
184184
temporal.api.common.v1.SearchAttributes search_attributes = 22;
185185
// When the workflow execution started event was first written
186186
google.protobuf.Timestamp start_time = 23;
187+
// Contains information about the root workflow execution. It is possible for the namespace to
188+
// be different than this workflow if using OSS and cross-namespace children, but this
189+
// information is not retained. Users should take care to track it by other means in such
190+
// situations.
191+
//
192+
// The root workflow execution is defined as follows:
193+
// 1. A workflow without parent workflow is its own root workflow.
194+
// 2. A workflow that has a parent workflow has the same root workflow as its parent workflow.
195+
//
196+
// See field in WorkflowExecutionStarted for more detail.
197+
temporal.api.common.v1.WorkflowExecution root_workflow = 24;
187198
}
188199

189200
// Notify a workflow that a timer has fired

sdk-core-protos/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,7 @@ pub mod coresdk {
725725
memo: attrs.memo,
726726
search_attributes: attrs.search_attributes,
727727
start_time: Some(start_time),
728+
root_workflow: attrs.root_workflow_execution,
728729
}
729730
}
730731
}

sdk/src/lib.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ impl WorkflowHalf {
388388
&self,
389389
common: &CommonWorker,
390390
shutdown_token: CancellationToken,
391-
activation: WorkflowActivation,
391+
mut activation: WorkflowActivation,
392392
completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
393393
) -> Result<
394394
Option<
@@ -403,8 +403,8 @@ impl WorkflowHalf {
403403

404404
// If the activation is to init a workflow, create a new workflow driver for it,
405405
// using the function associated with that workflow id
406-
if let Some(sw) = activation.jobs.iter().find_map(|j| match j.variant {
407-
Some(Variant::InitializeWorkflow(ref sw)) => Some(sw),
406+
if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
407+
Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
408408
_ => None,
409409
}) {
410410
let workflow_type = &sw.workflow_type;
@@ -425,9 +425,7 @@ impl WorkflowHalf {
425425
let (wff, activations) = wf_function.start_workflow(
426426
common.worker.get_config().namespace.clone(),
427427
common.task_queue.clone(),
428-
workflow_type,
429-
// NOTE: Don't clone args if this gets ported to be a non-test rust worker
430-
sw.arguments.clone(),
428+
std::mem::take(sw),
431429
completions_tx.clone(),
432430
);
433431
let jh = tokio::spawn(async move {
@@ -459,11 +457,11 @@ impl WorkflowHalf {
459457
.send(activation)
460458
.expect("Workflow should exist if we're sending it an activation");
461459
} else {
462-
// When we failed to start a workflow, we never inserted it into the cache.
463-
// But core sends us a `RemoveFromCache` job when we mark the StartWorkflow workflow activation
464-
// as a failure, which we need to complete.
465-
// Other SDKs add the workflow to the cache even when the workflow type is unknown/not found.
466-
// To circumvent this, we simply mark any RemoveFromCache job for workflows that are not in the cache as complete.
460+
// When we failed to start a workflow, we never inserted it into the cache. But core
461+
// sends us a `RemoveFromCache` job when we mark the StartWorkflow workflow activation
462+
// as a failure, which we need to complete. Other SDKs add the workflow to the cache
463+
// even when the workflow type is unknown/not found. To circumvent this, we simply mark
464+
// any RemoveFromCache job for workflows that are not in the cache as complete.
467465
if activation.jobs.len() == 1
468466
&& matches!(
469467
activation.jobs.first().map(|j| &j.variant),

sdk/src/workflow_context.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ use temporal_sdk_core_protos::{
3333
child_workflow::ChildWorkflowResult,
3434
common::NamespacedWorkflowExecution,
3535
nexus::NexusOperationResult,
36-
workflow_activation::resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
36+
workflow_activation::{
37+
InitializeWorkflow,
38+
resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
39+
},
3740
workflow_commands::{
3841
CancelChildWorkflowExecution, ModifyWorkflowProperties,
3942
RequestCancelExternalWorkflowExecution, SetPatchMarker,
@@ -54,7 +57,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
5457
pub struct WfContext {
5558
namespace: String,
5659
task_queue: String,
57-
args: Arc<Vec<Payload>>,
60+
inital_information: Arc<InitializeWorkflow>,
5861

5962
chan: Sender<RustWfCmd>,
6063
am_cancelled: watch::Receiver<Option<String>>,
@@ -71,7 +74,7 @@ impl WfContext {
7174
pub(super) fn new(
7275
namespace: String,
7376
task_queue: String,
74-
args: Vec<Payload>,
77+
init_workflow_job: InitializeWorkflow,
7578
am_cancelled: watch::Receiver<Option<String>>,
7679
) -> (Self, Receiver<RustWfCmd>) {
7780
// The receiving side is non-async
@@ -80,10 +83,17 @@ impl WfContext {
8083
Self {
8184
namespace,
8285
task_queue,
83-
args: Arc::new(args),
86+
shared: Arc::new(RwLock::new(WfContextSharedData {
87+
random_seed: init_workflow_job.randomness_seed,
88+
search_attributes: init_workflow_job
89+
.search_attributes
90+
.clone()
91+
.unwrap_or_default(),
92+
..Default::default()
93+
})),
94+
inital_information: Arc::new(init_workflow_job),
8495
chan,
8596
am_cancelled,
86-
shared: Arc::new(RwLock::new(Default::default())),
8797
seq_nums: Arc::new(RwLock::new(WfCtxProtectedDat {
8898
next_timer_sequence_number: 1,
8999
next_activity_sequence_number: 1,
@@ -109,7 +119,7 @@ impl WfContext {
109119

110120
/// Get the arguments provided to the workflow upon execution start
111121
pub fn get_args(&self) -> &[Payload] {
112-
self.args.as_slice()
122+
self.inital_information.arguments.as_slice()
113123
}
114124

115125
/// Return the current time according to the workflow (which is not wall-clock time).
@@ -138,6 +148,12 @@ impl WfContext {
138148
self.shared.read().random_seed
139149
}
140150

151+
/// Return various information that the workflow was initialized with. Will eventually become
152+
/// a proper non-proto workflow info struct.
153+
pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
154+
&self.inital_information
155+
}
156+
141157
/// A future that resolves if/when the workflow is cancelled, with the user provided cause
142158
pub async fn cancelled(&self) -> String {
143159
if let Some(s) = self.am_cancelled.borrow().as_ref() {

sdk/src/workflow_future.rs

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ use std::{
1717
use temporal_sdk_core_protos::{
1818
coresdk::{
1919
workflow_activation::{
20-
FireTimer, NotifyHasPatch, ResolveActivity, ResolveChildWorkflowExecution,
21-
ResolveChildWorkflowExecutionStart, WorkflowActivation, WorkflowActivationJob,
22-
workflow_activation_job::Variant,
20+
FireTimer, InitializeWorkflow, NotifyHasPatch, ResolveActivity,
21+
ResolveChildWorkflowExecution, ResolveChildWorkflowExecutionStart, WorkflowActivation,
22+
WorkflowActivationJob, workflow_activation_job::Variant,
2323
},
2424
workflow_commands::{
2525
CancelChildWorkflowExecution, CancelSignalWorkflow, CancelTimer,
@@ -44,26 +44,25 @@ use tracing::Instrument;
4444
impl WorkflowFunction {
4545
/// Start a workflow function, returning a future that will resolve when the workflow does,
4646
/// and a channel that can be used to send it activations.
47-
#[doc(hidden)]
48-
pub fn start_workflow(
47+
pub(crate) fn start_workflow(
4948
&self,
5049
namespace: String,
5150
task_queue: String,
52-
workflow_type: &str,
53-
args: Vec<Payload>,
51+
init_workflow_job: InitializeWorkflow,
5452
outgoing_completions: UnboundedSender<WorkflowActivationCompletion>,
5553
) -> (
5654
impl Future<Output = WorkflowResult<Payload>> + use<>,
5755
UnboundedSender<WorkflowActivation>,
5856
) {
5957
let (cancel_tx, cancel_rx) = watch::channel(None);
60-
let (wf_context, cmd_receiver) = WfContext::new(namespace, task_queue, args, cancel_rx);
61-
let (tx, incoming_activations) = unbounded_channel();
6258
let span = info_span!(
6359
"RunWorkflow",
64-
"otel.name" = format!("RunWorkflow:{}", workflow_type),
60+
"otel.name" = format!("RunWorkflow:{}", &init_workflow_job.workflow_type),
6561
"otel.kind" = "server"
6662
);
63+
let (wf_context, cmd_receiver) =
64+
WfContext::new(namespace, task_queue, init_workflow_job, cancel_rx);
65+
let (tx, incoming_activations) = unbounded_channel();
6766
let inner_fut = (self.wf_func)(wf_context.clone()).instrument(span);
6867
(
6968
WorkflowFuture {
@@ -346,7 +345,7 @@ impl Future for WorkflowFuture {
346345
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
347346
'activations: loop {
348347
// WF must always receive an activation first before responding with commands
349-
let mut activation = match self.incoming_activations.poll_recv(cx) {
348+
let activation = match self.incoming_activations.poll_recv(cx) {
350349
Poll::Ready(a) => match a {
351350
Some(act) => act,
352351
None => {
@@ -374,18 +373,6 @@ impl Future for WorkflowFuture {
374373

375374
let mut die_of_eviction_when_done = false;
376375
let mut activation_cmds = vec![];
377-
// Assign initial state from start workflow job
378-
if let Some(start_info) = activation.jobs.iter_mut().find_map(|j| {
379-
if let Some(Variant::InitializeWorkflow(s)) = j.variant.as_mut() {
380-
Some(s)
381-
} else {
382-
None
383-
}
384-
}) {
385-
let mut wlock = self.wf_ctx.shared.write();
386-
wlock.random_seed = start_info.randomness_seed;
387-
wlock.search_attributes = start_info.search_attributes.take().unwrap_or_default();
388-
};
389376
// Lame hack to avoid hitting "unregistered" update handlers in a situation where
390377
// the history has no commands until an update is accepted. Will go away w/ SDK redesign
391378
if activation

tests/integ_tests/workflow_tests/child_workflows.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,19 @@ use tokio::sync::Barrier;
1616
static PARENT_WF_TYPE: &str = "parent_wf";
1717
static CHILD_WF_TYPE: &str = "child_wf";
1818

19-
async fn child_wf(_ctx: WfContext) -> WorkflowResult<()> {
19+
async fn child_wf(ctx: WfContext) -> WorkflowResult<()> {
20+
assert_eq!(
21+
ctx.workflow_initial_info()
22+
.parent_workflow_info
23+
.as_ref()
24+
.unwrap()
25+
.workflow_id,
26+
ctx.workflow_initial_info()
27+
.root_workflow
28+
.as_ref()
29+
.unwrap()
30+
.workflow_id
31+
);
2032
Ok(().into())
2133
}
2234

0 commit comments

Comments
 (0)