Skip to content

Commit 9319e28

Browse files
Refactors to avoid calling the activity CHASM component directly
1 parent 81bb67e commit 9319e28

6 files changed

Lines changed: 112 additions & 31 deletions

File tree

chasm/lib/activity/activity.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ type ActivityStore interface {
7171
// (true for START_TO_CLOSE and HEARTBEAT timeouts; false for SCHEDULE_TO_START/CLOSE).
7272
OnActivityTimedOut(ctx chasm.MutableContext, act *Activity, timeoutFailure *failurepb.Failure, needsStartedEvent bool) error
7373
// OnActivityCanceled is called when an activity reaches the Canceled terminal state.
74-
OnActivityCanceled(ctx chasm.MutableContext, act *Activity) error
74+
// needsStartedEvent indicates whether an ActivityTaskStarted event must also be written
75+
// (true when the activity was started before being canceled; false for not-yet-started activities).
76+
OnActivityCanceled(ctx chasm.MutableContext, act *Activity, needsStartedEvent bool) error
7577
// OnActivityTerminated is called when an activity reaches the Terminated terminal state.
7678
OnActivityTerminated(ctx chasm.MutableContext, act *Activity) error
7779
}
@@ -328,7 +330,7 @@ func (a *Activity) OnActivityTimedOut(ctx chasm.MutableContext, _ *Activity, _ *
328330
}
329331

330332
// OnActivityCanceled implements ActivityStore for standalone activities.
331-
func (a *Activity) OnActivityCanceled(ctx chasm.MutableContext, _ *Activity) error {
333+
func (a *Activity) OnActivityCanceled(ctx chasm.MutableContext, _ *Activity, _ bool) error {
332334
return callback.ScheduleStandbyCallbacks(ctx, a.Callbacks)
333335
}
334336

chasm/lib/activity/statemachine.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,11 @@ var TransitionFailed = chasm.NewTransition(
243243
return err
244244
}
245245

246+
outcome := a.Outcome.Get(ctx)
247+
outcome.Variant = &activitypb.ActivityOutcome_Failed_{
248+
Failed: &activitypb.ActivityOutcome_Failed{Failure: req.GetFailure()},
249+
}
250+
246251
a.emitOnFailedMetrics(ctx, event.metricsHandler)
247252

248253
return a.StoreOrSelf(ctx).OnActivityFailed(ctx, a)
@@ -339,7 +344,8 @@ var TransitionCanceled = chasm.NewTransition(
339344

340345
a.emitOnCanceledMetrics(ctx, event.handler, event.fromStatus)
341346

342-
return a.StoreOrSelf(ctx).OnActivityCanceled(ctx, a)
347+
needsStartedEvent := event.fromStatus != activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED
348+
return a.StoreOrSelf(ctx).OnActivityCanceled(ctx, a, needsStartedEvent)
343349
},
344350
)
345351

chasm/lib/activity/statemachine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ func TestTransitionFailed(t *testing.T) {
562562
require.NotNil(t, heartbeatState.GetRecordedTime())
563563
protorequire.ProtoEqual(t, failure, attemptState.GetLastFailureDetails().GetFailure())
564564
require.NotNil(t, attemptState.GetLastFailureDetails().GetTime())
565-
require.Nil(t, outcome.GetFailed())
565+
protorequire.ProtoEqual(t, failure, outcome.GetFailed().GetFailure())
566566
}
567567

568568
func TestTransitionTerminated(t *testing.T) {

chasm/lib/workflow/workflow.go

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ import (
99
historypb "go.temporal.io/api/history/v1"
1010
"go.temporal.io/api/serviceerror"
1111
workflowpb "go.temporal.io/api/workflow/v1"
12+
"go.temporal.io/server/api/historyservice/v1"
1213
"go.temporal.io/server/chasm"
1314
"go.temporal.io/server/chasm/lib/activity"
1415
"go.temporal.io/server/chasm/lib/callback"
1516
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
1617
"go.temporal.io/server/chasm/lib/nexusoperation"
18+
"go.temporal.io/server/common/metrics"
1719
"go.temporal.io/server/service/history/historybuilder"
1820
"google.golang.org/protobuf/types/known/emptypb"
1921
"google.golang.org/protobuf/types/known/timestamppb"
@@ -176,11 +178,46 @@ func (w *Workflow) OnActivityTimedOut(ctx chasm.MutableContext, act *activity.Ac
176178
}
177179

178180
// OnActivityCanceled implements ActivityStore for workflow-embedded activities.
179-
// No ActivityTaskCanceled history event is written on the forward path in this prototype.
180-
func (w *Workflow) OnActivityCanceled(ctx chasm.MutableContext, act *activity.Activity) error {
181-
activityID := act.GetActivityId()
182-
delete(w.Activities, activityID)
183-
return w.ScheduleWorkflowTask()
181+
// When needsStartedEvent is true (activity was started before being canceled), writes
182+
// ActivityTaskStarted + ActivityTaskCanceled history events; Apply() handles cleanup.
183+
// When needsStartedEvent is false (activity was never started), the caller is responsible
184+
// for writing the ActivityTaskCanceled event (via AddActivityTaskCanceledEventCHASM).
185+
func (w *Workflow) OnActivityCanceled(ctx chasm.MutableContext, act *activity.Activity, needsStartedEvent bool) error {
186+
if !needsStartedEvent {
187+
// Not-started case: ActivityTaskCanceled is written by AddActivityTaskCanceledEventCHASM.
188+
// Just clean up the activity and schedule a WFT.
189+
activityID := act.GetActivityId()
190+
delete(w.Activities, activityID)
191+
return w.ScheduleWorkflowTask()
192+
}
193+
194+
scheduledEventID := act.GetScheduledEventId()
195+
attempt := act.LastAttempt.Get(ctx)
196+
startedEvent, err := addAndApplyHistoryEvent[ActivityTaskStartedEventDefinition](w, ctx, func(e *historypb.HistoryEvent) {
197+
e.Attributes = &historypb.HistoryEvent_ActivityTaskStartedEventAttributes{
198+
ActivityTaskStartedEventAttributes: &historypb.ActivityTaskStartedEventAttributes{
199+
ScheduledEventId: scheduledEventID,
200+
Attempt: attempt.GetCount(),
201+
RequestId: attempt.GetStartRequestId(),
202+
Identity: attempt.GetLastWorkerIdentity(),
203+
},
204+
}
205+
})
206+
if err != nil {
207+
return err
208+
}
209+
cancelDetails := act.Outcome.Get(ctx).GetFailed().GetFailure().GetCanceledFailureInfo().GetDetails()
210+
_, err = addAndApplyHistoryEvent[ActivityTaskCanceledEventDefinition](w, ctx, func(e *historypb.HistoryEvent) {
211+
e.Attributes = &historypb.HistoryEvent_ActivityTaskCanceledEventAttributes{
212+
ActivityTaskCanceledEventAttributes: &historypb.ActivityTaskCanceledEventAttributes{
213+
ScheduledEventId: scheduledEventID,
214+
StartedEventId: startedEvent.GetEventId(),
215+
Details: cancelDetails,
216+
Identity: attempt.GetLastWorkerIdentity(),
217+
},
218+
}
219+
})
220+
return err
184221
}
185222

186223
// OnActivityTerminated implements ActivityStore for workflow-embedded activities.
@@ -243,6 +280,54 @@ func (w *Workflow) AddCompletionCallbacks(
243280
return nil
244281
}
245282

283+
// RegisterScheduledActivity applies TransitionScheduled to act and adds it to the workflow's
284+
// Activities map. This is the single entry point for scheduling embedded activities so that
285+
// the transition trigger stays inside the workflow package rather than in the caller.
286+
func (w *Workflow) RegisterScheduledActivity(
287+
ctx chasm.MutableContext,
288+
activityID string,
289+
act *activity.Activity,
290+
) error {
291+
if err := activity.TransitionScheduled.Apply(act, ctx, nil); err != nil {
292+
return err
293+
}
294+
w.AddEmbeddedActivity(ctx, activityID, act)
295+
return nil
296+
}
297+
298+
// RequestCancelEmbeddedActivity finds the embedded activity with scheduledEventID, applies the
299+
// cancel transition, and returns (wasNotStarted, found, error). found=false means no CHASM
300+
// activity has that scheduledEventID; the caller should fall back to the legacy path.
301+
func (w *Workflow) RequestCancelEmbeddedActivity(
302+
ctx chasm.MutableContext,
303+
scheduledEventID int64,
304+
identity string,
305+
) (wasNotStarted bool, found bool, err error) {
306+
act := w.FindActivityByScheduledEventID(ctx, scheduledEventID)
307+
if act == nil {
308+
return false, false, nil
309+
}
310+
wasNotStarted, err = act.RequestCancelFromWorkflowTask(ctx, identity)
311+
return wasNotStarted, true, err
312+
}
313+
314+
// CompleteEmbeddedActivityByID finds the activity by activityID, applies TransitionCompleted,
315+
// and returns (true, nil) on success. Returns (false, nil) if no CHASM activity with that ID
316+
// exists; the caller should fall back to the legacy path.
317+
func (w *Workflow) CompleteEmbeddedActivityByID(
318+
ctx chasm.MutableContext,
319+
activityID string,
320+
req *historyservice.RespondActivityTaskCompletedRequest,
321+
metricsHandler metrics.Handler,
322+
) (bool, error) {
323+
actField, ok := w.Activities[activityID]
324+
if !ok {
325+
return false, nil
326+
}
327+
act := actField.Get(ctx)
328+
return true, activity.TransitionCompleted.Apply(act, ctx, activity.NewCompleteEvent(req, metricsHandler))
329+
}
330+
246331
// AddEmbeddedActivity adds a CHASM activity sub-component to the workflow, keyed by SDK-provided activity ID.
247332
func (w *Workflow) AddEmbeddedActivity(
248333
ctx chasm.MutableContext,

service/history/workflow/mutable_state_impl.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4175,16 +4175,12 @@ func (ms *MutableStateImpl) AddActivityTaskScheduledEventCHASM(
41754175
}),
41764176
}
41774177

4178-
// 4. Call TransitionScheduled first (generates ActivityDispatchTask and timeout pure tasks),
4179-
// following the same pattern as nexus operations where the transition is applied before
4180-
// the component is added to the parent's map field.
4181-
if err := chasmactivity.TransitionScheduled.Apply(act, chasmCtx, nil); err != nil {
4178+
// 4. Register: applies TransitionScheduled (generates ActivityDispatchTask and timeout pure
4179+
// tasks) then adds to the workflow's Activities map, keyed by activity ID.
4180+
if err := wf.RegisterScheduledActivity(chasmCtx, command.GetActivityId(), act); err != nil {
41824181
return 0, err
41834182
}
41844183

4185-
// 5. Add to the workflow's Activities map — this registers it in the CHASM tree, keyed by activity ID.
4186-
wf.AddEmbeddedActivity(chasmCtx, command.GetActivityId(), act)
4187-
41884184
// Accumulate the size of this CHASM activity so GetApproximatePersistedSize includes it
41894185
// before CloseTransaction serializes it into approximateSize.
41904186
ms.chasmPendingSize += command.GetInput().Size() + command.GetHeader().Size() + act.Size()
@@ -4652,8 +4648,11 @@ func (ms *MutableStateImpl) AddActivityTaskCancelRequestedEventCHASM(
46524648
return nil, false, err
46534649
}
46544650

4655-
act := wf.FindActivityByScheduledEventID(chasmCtx, scheduledEventID)
4656-
if act == nil {
4651+
wasNotStarted, found, err := wf.RequestCancelEmbeddedActivity(chasmCtx, scheduledEventID, identity)
4652+
if err != nil {
4653+
return nil, false, err
4654+
}
4655+
if !found {
46574656
// Return ErrCHASMActivityNotFound so the caller can fall back to the legacy
46584657
// AddActivityTaskCancelRequestedEvent path (backwards-compat guard for executions
46594658
// that scheduled the activity before EnableCHASMActivityPrototype was enabled).
@@ -4662,11 +4661,6 @@ func (ms *MutableStateImpl) AddActivityTaskCancelRequestedEventCHASM(
46624661

46634662
actCancelReqEvent := ms.hBuilder.AddActivityTaskCancelRequestedEvent(workflowTaskCompletedEventID, scheduledEventID)
46644663

4665-
wasNotStarted, err := act.RequestCancelFromWorkflowTask(chasmCtx, identity)
4666-
if err != nil {
4667-
return nil, false, err
4668-
}
4669-
46704664
// Caller is responsible for writing ActivityTaskCanceled when wasNotStarted is true,
46714665
// following the same pattern as the legacy AddActivityTaskCancelRequestedEvent path.
46724666
return actCancelReqEvent, wasNotStarted, nil
@@ -4716,21 +4710,15 @@ func (ms *MutableStateImpl) RespondCHASMActivityCompletedByID(activityID string,
47164710
if err != nil {
47174711
return false, err
47184712
}
4719-
actField, ok := wf.Activities[activityID]
4720-
if !ok {
4721-
return false, nil
4722-
}
4723-
act := actField.Get(chasmCtx)
47244713
metricsHandler := ms.metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryRespondActivityTaskCompletedScope))
4725-
// Build a minimal RespondActivityTaskCompletedRequest for the transition.
47264714
req := &historyservice.RespondActivityTaskCompletedRequest{
47274715
NamespaceId: ms.GetNamespaceEntry().ID().String(),
47284716
CompleteRequest: &workflowservice.RespondActivityTaskCompletedRequest{
47294717
Result: result,
47304718
Identity: identity,
47314719
},
47324720
}
4733-
return true, chasmactivity.TransitionCompleted.Apply(act, chasmCtx, chasmactivity.NewCompleteEvent(req, metricsHandler))
4721+
return wf.CompleteEmbeddedActivityByID(chasmCtx, activityID, req, metricsHandler)
47344722
}
47354723

47364724
func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCommand, controlQueue string) error {

tests/chasm_activity_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (s *ChasmActivitySuite) TestChasmActivity_PendingActivityDescribe() {
364364

365365
// assertContainsEventType is a helper that asserts the given event type appears at least once
366366
// in the history events slice.
367-
func (s *ChasmActivitySuite) assertContainsEventType(events []*historypb.HistoryEvent, eventType enumspb.EventType, msgAndArgs ...interface{}) {
367+
func (s *ChasmActivitySuite) assertContainsEventType(events []*historypb.HistoryEvent, eventType enumspb.EventType, msgAndArgs ...any) {
368368
s.T().Helper()
369369
for _, e := range events {
370370
if e.EventType == eventType {

0 commit comments

Comments
 (0)