Skip to content

Commit 7cc180e

Browse files
Add context metadata provider interface for root components to implement (#9773)
## What changed? Add context metadata provider interface for root components to implement. ## Why? Allow Component instances a method to provide key value pairs to be propagated in ContextMetadata based off Component state. ## How did you test it? - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [X] added new unit test(s) - [ ] added new functional test(s)
1 parent dd61397 commit 7cc180e

9 files changed

Lines changed: 577 additions & 25 deletions

File tree

chasm/component.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ type TerminateComponentResponse struct{}
4444
// TODO: (not yet true) Visibility record will no longer be updated after RootComponent is closed.
4545
type RootComponent interface {
4646
TerminableComponent
47+
48+
// ContextMetadata returns execution metadata to propagate to the request context.
49+
ContextMetadata(Context) map[string]string
4750
}
4851

4952
// Embed UnimplementedComponent to get forward compatibility

chasm/component_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/activity/activity.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
106106
}
107107
}
108108

109+
func (a *Activity) ContextMetadata(_ chasm.Context) map[string]string {
110+
// TODO: Export standalone activity context metadata.
111+
return nil
112+
}
113+
109114
// NewStandaloneActivity creates a new activity component and adds associated tasks to start execution.
110115
func NewStandaloneActivity(
111116
ctx chasm.MutableContext,

chasm/lib/scheduler/scheduler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,11 @@ func (s *Scheduler) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
288288
return chasm.LifecycleStateRunning
289289
}
290290

291+
func (s *Scheduler) ContextMetadata(_ chasm.Context) map[string]string {
292+
// TODO: Export scheduler context metadata.
293+
return nil
294+
}
295+
291296
// Terminate implements the chasm.RootComponent interface.
292297
func (s *Scheduler) Terminate(
293298
_ chasm.MutableContext,

chasm/lib/tests/payload.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ func (s *PayloadStore) Cancel(
121121
return CancelPayloadStoreResponse{}, nil
122122
}
123123

124+
func (s *PayloadStore) ContextMetadata(_ chasm.Context) map[string]string {
125+
return map[string]string{
126+
string(componentCtxKey): componentCtxVal,
127+
}
128+
}
129+
124130
func (s *PayloadStore) AddPayload(
125131
mutableContext chasm.MutableContext,
126132
request AddPayloadRequest,

chasm/lib/workflow/workflow.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ func (w *Workflow) LifecycleState(
4646
return chasm.LifecycleStateRunning
4747
}
4848

49+
func (w *Workflow) ContextMetadata(_ chasm.Context) map[string]string {
50+
// TODO: Export workflow metadata from the CHASM workflow root instead of CloseTransaction().
51+
return nil
52+
}
53+
4954
// ProcessCloseCallbacks triggers "WorkflowClosed" callbacks using the CHASM implementation.
5055
// It iterates through all callbacks and schedules WorkflowClosed ones that are in STANDBY state.
5156
func (w *Workflow) ProcessCloseCallbacks(ctx chasm.MutableContext) error {

chasm/test_component_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ var (
8080

8181
_ VisibilitySearchAttributesProvider = (*TestComponent)(nil)
8282
_ VisibilityMemoProvider = (*TestComponent)(nil)
83+
_ RootComponent = (*TestComponent)(nil)
8384
)
8485

8586
func (tc *TestComponent) LifecycleState(_ Context) LifecycleState {
@@ -109,6 +110,11 @@ func (tc *TestComponent) Fail(_ MutableContext) {
109110
tc.ComponentData.Status = enumspb.WORKFLOW_EXECUTION_STATUS_FAILED
110111
}
111112

113+
func (tc *TestComponent) ContextMetadata(_ Context) map[string]string {
114+
// TODO: Export context metadata from this test root.
115+
return nil
116+
}
117+
112118
// SearchAttributes implements VisibilitySearchAttributesProvider interface.
113119
func (tc *TestComponent) SearchAttributes(_ Context) []SearchAttributeKeyValue {
114120
return []SearchAttributeKeyValue{

service/history/chasm_engine.go

Lines changed: 85 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
enumsspb "go.temporal.io/server/api/enums/v1"
1212
"go.temporal.io/server/chasm"
1313
"go.temporal.io/server/common"
14+
"go.temporal.io/server/common/contextutil"
1415
"go.temporal.io/server/common/convert"
1516
"go.temporal.io/server/common/definition"
1617
"go.temporal.io/server/common/headers"
@@ -108,6 +109,69 @@ func (e *ChasmEngine) NotifyExecution(key chasm.ExecutionKey) {
108109
e.notifier.Notify(key)
109110
}
110111

112+
func (e *ChasmEngine) setContextMetadata(
113+
ctx context.Context,
114+
chasmTree *chasm.Node,
115+
) chasm.Context {
116+
chasmContext := chasm.NewContext(ctx, chasmTree)
117+
118+
rootComponent, err := chasmTree.Component(chasmContext, chasm.ComponentRef{})
119+
if err != nil {
120+
executionKey := chasmContext.ExecutionKey()
121+
e.logger.Error(
122+
"Failed to resolve CHASM root component for context metadata",
123+
tag.WorkflowNamespaceID(executionKey.NamespaceID),
124+
tag.WorkflowID(executionKey.BusinessID),
125+
tag.WorkflowRunID(executionKey.RunID),
126+
tag.Error(err),
127+
)
128+
return chasmContext
129+
}
130+
131+
root, ok := rootComponent.(chasm.RootComponent)
132+
if !ok {
133+
softassert.Fail(
134+
e.logger,
135+
"root node must implement RootComponent interface",
136+
tag.NewStringTag("component_type", fmt.Sprintf("%T", rootComponent)),
137+
)
138+
return chasmContext
139+
}
140+
141+
for key, value := range root.ContextMetadata(chasmContext) {
142+
contextutil.ContextMetadataSet(ctx, key, value)
143+
}
144+
145+
return chasmContext
146+
}
147+
148+
func chasmTreeFromMutableState(
149+
logger log.Logger,
150+
mutableState historyi.MutableState,
151+
) (*chasm.Node, error) {
152+
chasmTree, ok := mutableState.ChasmTree().(*chasm.Node)
153+
if !ok {
154+
return nil, softassert.UnexpectedInternalErr(
155+
logger,
156+
"CHASM tree implementation not properly wired up",
157+
fmt.Errorf("encountered type: %T, expected type: %T", mutableState.ChasmTree(), &chasm.Node{}),
158+
)
159+
}
160+
return chasmTree, nil
161+
}
162+
163+
func (e *ChasmEngine) setContextMetadataFromMutableState(
164+
ctx context.Context,
165+
mutableState historyi.MutableState,
166+
) {
167+
chasmTree, err := chasmTreeFromMutableState(e.logger, mutableState)
168+
if err != nil {
169+
e.logger.Error("Failed to resolve CHASM tree for context metadata", tag.Error(err))
170+
return
171+
}
172+
e.setContextMetadata(ctx, chasmTree)
173+
}
174+
111175
func (e *ChasmEngine) StartExecution(
112176
ctx context.Context,
113177
executionRef chasm.ComponentRef,
@@ -177,6 +241,7 @@ func (e *ChasmEngine) startExecution(
177241
return chasm.StartExecutionResult{}, err
178242
}
179243
if !hasCurrentRun {
244+
e.setContextMetadataFromMutableState(ctx, newExecutionParams.mutableState)
180245
serializedRef, err := newExecutionParams.executionRef.Serialize(e.registry)
181246
if err != nil {
182247
// Created is true here because persistAsBrandNew succeeded, but we failed to serialize the ref.
@@ -354,13 +419,9 @@ func (e *ChasmEngine) applyUpdateWithLease(
354419
updateFn func(chasm.MutableContext, chasm.Component) error,
355420
) ([]byte, error) {
356421
mutableState := executionLease.GetMutableState()
357-
chasmTree, ok := mutableState.ChasmTree().(*chasm.Node)
358-
if !ok {
359-
return nil, serviceerror.NewInternalf(
360-
"CHASM tree implementation not properly wired up, encountered type: %T, expected type: %T",
361-
mutableState.ChasmTree(),
362-
&chasm.Node{},
363-
)
422+
chasmTree, err := chasmTreeFromMutableState(shardContext.GetLogger(), mutableState)
423+
if err != nil {
424+
return nil, err
364425
}
365426

366427
mutableContext := chasm.NewMutableContext(ctx, chasmTree)
@@ -375,6 +436,8 @@ func (e *ChasmEngine) applyUpdateWithLease(
375436

376437
// TODO: Support WithSpeculative() TransitionOption.
377438

439+
e.setContextMetadata(ctx, chasmTree)
440+
378441
if err := executionLease.GetContext().UpdateWorkflowExecutionAsActive(
379442
ctx,
380443
shardContext,
@@ -438,6 +501,8 @@ func (e *ChasmEngine) startAndUpdateExecution(
438501
return chasm.ExecutionKey{}, nil, false, currentRunInfo.CurrentWorkflowConditionFailedError
439502
}
440503

504+
e.setContextMetadataFromMutableState(ctx, newExecutionParams.mutableState)
505+
441506
serializedRef, err := newExecutionParams.executionRef.Serialize(e.registry)
442507

443508
return newExecutionParams.executionRef.ExecutionKey, serializedRef, true, err
@@ -502,6 +567,8 @@ func (e *ChasmEngine) deleteExecution(
502567
mutableState := executionLease.GetMutableState()
503568
we := mutableState.GetWorkflowKey()
504569

570+
e.setContextMetadataFromMutableState(ctx, mutableState)
571+
505572
log.With(shardContext.GetLogger(),
506573
tag.WorkflowNamespaceID(ref.NamespaceID),
507574
tag.WorkflowID(we.WorkflowID),
@@ -580,16 +647,12 @@ func (e *ChasmEngine) readComponent(
580647
executionLease.GetReleaseFn()(nil)
581648
}()
582649

583-
chasmTree, ok := executionLease.GetMutableState().ChasmTree().(*chasm.Node)
584-
if !ok {
585-
return serviceerror.NewInternalf(
586-
"CHASM tree implementation not properly wired up, encountered type: %T, expected type: %T",
587-
executionLease.GetMutableState().ChasmTree(),
588-
&chasm.Node{},
589-
)
650+
chasmTree, err := chasmTreeFromMutableState(e.logger, executionLease.GetMutableState())
651+
if err != nil {
652+
return err
590653
}
591654

592-
chasmContext := chasm.NewContext(ctx, chasmTree)
655+
chasmContext := e.setContextMetadata(ctx, chasmTree)
593656
component, err := chasmTree.Component(chasmContext, ref)
594657
if err != nil {
595658
return err
@@ -686,16 +749,13 @@ func (e *ChasmEngine) predicateSatisfied(
686749
ref chasm.ComponentRef,
687750
executionLease api.WorkflowLease,
688751
) ([]byte, error) {
689-
chasmTree, ok := executionLease.GetMutableState().ChasmTree().(*chasm.Node)
690-
if !ok {
691-
return nil, serviceerror.NewInternalf(
692-
"CHASM tree implementation not properly wired up, encountered type: %T, expected type: %T",
693-
executionLease.GetMutableState().ChasmTree(),
694-
&chasm.Node{},
695-
)
752+
chasmTree, err := chasmTreeFromMutableState(e.logger, executionLease.GetMutableState())
753+
if err != nil {
754+
return nil, err
696755
}
697756

698-
chasmContext := chasm.NewContext(ctx, chasmTree)
757+
chasmContext := e.setContextMetadata(ctx, chasmTree)
758+
699759
component, err := chasmTree.Component(chasmContext, ref)
700760
if err != nil {
701761
return nil, err
@@ -1052,6 +1112,8 @@ func (e *ChasmEngine) handleReusePolicy(
10521112
return chasm.StartExecutionResult{}, err
10531113
}
10541114

1115+
e.setContextMetadataFromMutableState(ctx, newExecutionParams.mutableState)
1116+
10551117
serializedRef, err := newExecutionParams.executionRef.Serialize(e.registry)
10561118
if err != nil {
10571119
return chasm.StartExecutionResult{ExecutionKey: newExecutionParams.executionRef.ExecutionKey, Created: true}, err

0 commit comments

Comments
 (0)