Skip to content

Commit a921890

Browse files
Add support for root workflow execution to workflow info (#1923)
1 parent 0517ec9 commit a921890

6 files changed

+53
-1
lines changed

internal/internal_task_handlers.go

+10
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,15 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
733733
RunID: attributes.ParentWorkflowExecution.GetRunId(),
734734
}
735735
}
736+
737+
var rootWorkflowExecution *WorkflowExecution
738+
if attributes.RootWorkflowExecution != nil {
739+
rootWorkflowExecution = &WorkflowExecution{
740+
ID: attributes.RootWorkflowExecution.GetWorkflowId(),
741+
RunID: attributes.RootWorkflowExecution.GetRunId(),
742+
}
743+
}
744+
736745
workflowInfo := &WorkflowInfo{
737746
WorkflowExecution: WorkflowExecution{
738747
ID: workflowID,
@@ -754,6 +763,7 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
754763
ContinuedExecutionRunID: attributes.ContinuedExecutionRunId,
755764
ParentWorkflowNamespace: attributes.ParentWorkflowNamespace,
756765
ParentWorkflowExecution: parentWorkflowExecution,
766+
RootWorkflowExecution: rootWorkflowExecution,
757767
Memo: attributes.Memo,
758768
SearchAttributes: attributes.SearchAttributes,
759769
RetryPolicy: convertFromPBRetryPolicy(attributes.RetryPolicy),

internal/internal_task_handlers_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
11891189
Input: lastCompletionResult,
11901190
TaskQueue: &taskqueuepb.TaskQueue{Name: testWorkflowTaskTaskqueue},
11911191
ParentWorkflowExecution: parentExecution,
1192+
RootWorkflowExecution: parentExecution,
11921193
CronSchedule: cronSchedule,
11931194
ContinuedExecutionRunId: continuedRunID,
11941195
ParentWorkflowNamespace: parentNamespace,
@@ -1221,6 +1222,8 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
12211222
t.EqualValues(testWorkflowTaskTaskqueue, result.TaskQueueName)
12221223
t.EqualValues(parentID, result.ParentWorkflowExecution.ID)
12231224
t.EqualValues(parentRunID, result.ParentWorkflowExecution.RunID)
1225+
t.EqualValues(parentID, result.RootWorkflowExecution.ID)
1226+
t.EqualValues(parentRunID, result.RootWorkflowExecution.RunID)
12241227
t.EqualValues(cronSchedule, result.CronSchedule)
12251228
t.EqualValues(continuedRunID, result.ContinuedExecutionRunID)
12261229
t.EqualValues(parentNamespace, result.ParentWorkflowNamespace)

internal/internal_workflow_testsuite.go

+5
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,11 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(
469469
childEnv.workflowInfo.CronSchedule = cronSchedule
470470
childEnv.workflowInfo.ParentWorkflowNamespace = env.workflowInfo.Namespace
471471
childEnv.workflowInfo.ParentWorkflowExecution = &env.workflowInfo.WorkflowExecution
472+
if env.workflowInfo.RootWorkflowExecution == nil {
473+
childEnv.workflowInfo.RootWorkflowExecution = &env.workflowInfo.WorkflowExecution
474+
} else {
475+
childEnv.workflowInfo.ParentWorkflowExecution = env.workflowInfo.RootWorkflowExecution
476+
}
472477

473478
searchAttrs, err := serializeSearchAttributes(params.SearchAttributes, params.TypedSearchAttributes)
474479
if err != nil {

internal/workflow.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,9 @@ type WorkflowInfo struct {
12791279
ContinuedExecutionRunID string
12801280
ParentWorkflowNamespace string
12811281
ParentWorkflowExecution *WorkflowExecution
1282-
Memo *commonpb.Memo // Value can be decoded using data converter (defaultDataConverter, or custom one if set).
1282+
// RootWorkflowExecution is the first workflow execution in the chain of workflows. If a workflow is itself a root workflow, then this field is nil.
1283+
RootWorkflowExecution *WorkflowExecution
1284+
Memo *commonpb.Memo // Value can be decoded using data converter (defaultDataConverter, or custom one if set).
12831285
// Deprecated: use [Workflow.GetTypedSearchAttributes] instead.
12841286
SearchAttributes *commonpb.SearchAttributes // Value can be decoded using defaultDataConverter.
12851287
RetryPolicy *RetryPolicy

test/integration_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -5263,6 +5263,18 @@ func (ts *IntegrationTestSuite) TestHistoryLength() {
52635263
ts.Equal(expected, actual)
52645264
}
52655265

5266+
func (ts *IntegrationTestSuite) TestRootWorkflow() {
5267+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
5268+
defer cancel()
5269+
5270+
run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-root-workflow-length"),
5271+
ts.workflows.RootWorkflow)
5272+
ts.NoError(err)
5273+
var result string
5274+
ts.NoError(run.Get(ctx, &result))
5275+
ts.Equal("empty test-root-workflow-length", result)
5276+
}
5277+
52665278
func (ts *IntegrationTestSuite) TestMultiNamespaceClient() {
52675279
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
52685280
defer cancel()

test/workflow_test.go

+20
Original file line numberDiff line numberDiff line change
@@ -3160,6 +3160,25 @@ func (w *Workflows) HistoryLengths(ctx workflow.Context, activityCount int) (len
31603160
return
31613161
}
31623162

3163+
func (w *Workflows) RootWorkflow(ctx workflow.Context) (string, error) {
3164+
var result string
3165+
if workflow.GetInfo(ctx).RootWorkflowExecution == nil {
3166+
result += "empty"
3167+
} else {
3168+
result += workflow.GetInfo(ctx).RootWorkflowExecution.ID
3169+
}
3170+
if workflow.GetInfo(ctx).ParentWorkflowExecution == nil {
3171+
result += " "
3172+
var childResult string
3173+
err := workflow.ExecuteChildWorkflow(ctx, w.RootWorkflow).Get(ctx, &childResult)
3174+
if err != nil {
3175+
return "", err
3176+
}
3177+
result += childResult
3178+
}
3179+
return result, nil
3180+
}
3181+
31633182
func (w *Workflows) HeartbeatSpecificCount(ctx workflow.Context, interval time.Duration, count int) error {
31643183
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptionsWithRetry())
31653184
var activities *Activities
@@ -3609,6 +3628,7 @@ func (w *Workflows) register(worker worker.Worker) {
36093628
worker.RegisterWorkflow(w.NonDeterminismReplay)
36103629
worker.RegisterWorkflow(w.MutableSideEffect)
36113630
worker.RegisterWorkflow(w.HistoryLengths)
3631+
worker.RegisterWorkflow(w.RootWorkflow)
36123632
worker.RegisterWorkflow(w.HeartbeatSpecificCount)
36133633
worker.RegisterWorkflow(w.UpsertMemo)
36143634
worker.RegisterWorkflow(w.UpsertTypedSearchAttributesWorkflow)

0 commit comments

Comments
 (0)