Skip to content

Commit fc96d48

Browse files
fix
1 parent 32854e7 commit fc96d48

4 files changed

Lines changed: 15 additions & 20 deletions

File tree

chasm/lib/activity/activity.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s
217217
WorkflowId: execKey.BusinessID,
218218
RunId: execKey.RunID,
219219
}
220-
req.ScheduledEventId = a.ActivityState.GetScheduledEventId()
220+
req.ScheduledEventId = a.GetScheduledEventId()
221221
}
222222

223223
return req, nil
@@ -256,7 +256,7 @@ func (a *Activity) GenerateRecordActivityTaskStartedResponse(
256256

257257
// For workflow-embedded activities, ActivityId is the SDK-provided activity ID string.
258258
// For standalone activities, fall back to the execution key's BusinessID.
259-
activityID := a.ActivityState.GetActivityId()
259+
activityID := a.GetActivityId()
260260
if activityID == "" {
261261
activityID = key.BusinessID
262262
}
@@ -918,7 +918,7 @@ func (a *Activity) BuildPendingActivityInfo(ctx chasm.Context) *workflowpb.Pendi
918918

919919
// For workflow-embedded activities, ActivityId is the SDK-provided activity ID string.
920920
// For standalone activities, this will be empty (standalone activities don't use BuildPendingActivityInfo).
921-
activityID := a.ActivityState.GetActivityId()
921+
activityID := a.GetActivityId()
922922

923923
return &workflowpb.PendingActivityInfo{
924924
ActivityId: activityID,

chasm/lib/workflow/activity_events.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package workflow
33
import (
44
enumspb "go.temporal.io/api/enums/v1"
55
historypb "go.temporal.io/api/history/v1"
6-
workflowservice "go.temporal.io/api/workflowservice/v1"
7-
historyservice "go.temporal.io/server/api/historyservice/v1"
6+
"go.temporal.io/api/workflowservice/v1"
7+
"go.temporal.io/server/api/historyservice/v1"
88
"go.temporal.io/server/chasm"
99
chasmactivity "go.temporal.io/server/chasm/lib/activity"
1010
activitypb "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
@@ -117,7 +117,7 @@ func (d ActivityTaskCompletedEventDefinition) Apply(ctx chasm.MutableContext, wf
117117
return nil
118118
}
119119

120-
delete(wf.Activities, act.ActivityState.GetActivityId())
120+
delete(wf.Activities, act.GetActivityId())
121121
return wf.ScheduleWorkflowTask()
122122
}
123123

@@ -144,7 +144,7 @@ func (d ActivityTaskFailedEventDefinition) Apply(ctx chasm.MutableContext, wf *W
144144
return nil
145145
}
146146

147-
delete(wf.Activities, act.ActivityState.GetActivityId())
147+
delete(wf.Activities, act.GetActivityId())
148148
return wf.ScheduleWorkflowTask()
149149
}
150150

@@ -171,7 +171,7 @@ func (d ActivityTaskTimedOutEventDefinition) Apply(ctx chasm.MutableContext, wf
171171
return nil
172172
}
173173

174-
delete(wf.Activities, act.ActivityState.GetActivityId())
174+
delete(wf.Activities, act.GetActivityId())
175175
return wf.ScheduleWorkflowTask()
176176
}
177177

@@ -230,7 +230,7 @@ func (d ActivityTaskCanceledEventDefinition) Apply(ctx chasm.MutableContext, wf
230230
return nil
231231
}
232232

233-
delete(wf.Activities, act.ActivityState.GetActivityId())
233+
delete(wf.Activities, act.GetActivityId())
234234
return wf.ScheduleWorkflowTask()
235235
}
236236

chasm/lib/workflow/workflow.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (w *Workflow) Terminate(
7676
// OnActivityCompleted implements ActivityStore for workflow-embedded activities.
7777
// Writes ActivityTaskStarted + ActivityTaskCompleted history events; Apply() handles cleanup.
7878
func (w *Workflow) OnActivityCompleted(ctx chasm.MutableContext, act *activity.Activity) error {
79-
scheduledEventID := act.ActivityState.GetScheduledEventId()
79+
scheduledEventID := act.GetScheduledEventId()
8080
attempt := act.LastAttempt.Get(ctx)
8181
startedEvent, err := addAndApplyHistoryEvent[ActivityTaskStartedEventDefinition](w, ctx, func(e *historypb.HistoryEvent) {
8282
e.Attributes = &historypb.HistoryEvent_ActivityTaskStartedEventAttributes{
@@ -107,7 +107,7 @@ func (w *Workflow) OnActivityCompleted(ctx chasm.MutableContext, act *activity.A
107107
// OnActivityFailed implements ActivityStore for workflow-embedded activities.
108108
// Writes ActivityTaskStarted + ActivityTaskFailed history events; Apply() handles cleanup.
109109
func (w *Workflow) OnActivityFailed(ctx chasm.MutableContext, act *activity.Activity) error {
110-
scheduledEventID := act.ActivityState.GetScheduledEventId()
110+
scheduledEventID := act.GetScheduledEventId()
111111
attempt := act.LastAttempt.Get(ctx)
112112
startedEvent, err := addAndApplyHistoryEvent[ActivityTaskStartedEventDefinition](w, ctx, func(e *historypb.HistoryEvent) {
113113
e.Attributes = &historypb.HistoryEvent_ActivityTaskStartedEventAttributes{
@@ -139,7 +139,7 @@ func (w *Workflow) OnActivityFailed(ctx chasm.MutableContext, act *activity.Acti
139139
// OnActivityTimedOut implements ActivityStore for workflow-embedded activities.
140140
// Writes ActivityTaskTimedOut (and optionally ActivityTaskStarted) history events; Apply() handles cleanup.
141141
func (w *Workflow) OnActivityTimedOut(ctx chasm.MutableContext, act *activity.Activity, timeoutFailure *failurepb.Failure, needsStartedEvent bool) error {
142-
scheduledEventID := act.ActivityState.GetScheduledEventId()
142+
scheduledEventID := act.GetScheduledEventId()
143143
if timeoutFailure == nil {
144144
return nil
145145
}
@@ -178,14 +178,14 @@ func (w *Workflow) OnActivityTimedOut(ctx chasm.MutableContext, act *activity.Ac
178178
// OnActivityCanceled implements ActivityStore for workflow-embedded activities.
179179
// No ActivityTaskCanceled history event is written on the forward path in this prototype.
180180
func (w *Workflow) OnActivityCanceled(ctx chasm.MutableContext, act *activity.Activity) error {
181-
activityID := act.ActivityState.GetActivityId()
181+
activityID := act.GetActivityId()
182182
delete(w.Activities, activityID)
183183
return w.ScheduleWorkflowTask()
184184
}
185185

186186
// OnActivityTerminated implements ActivityStore for workflow-embedded activities.
187187
func (w *Workflow) OnActivityTerminated(ctx chasm.MutableContext, act *activity.Activity) error {
188-
activityID := act.ActivityState.GetActivityId()
188+
activityID := act.GetActivityId()
189189
delete(w.Activities, activityID)
190190
return w.ScheduleWorkflowTask()
191191
}
@@ -311,7 +311,7 @@ func (w *Workflow) BuildPendingActivityInfos(ctx chasm.Context) ([]*workflowpb.P
311311
func (w *Workflow) FindActivityByScheduledEventID(ctx chasm.Context, scheduledEventID int64) *activity.Activity {
312312
for _, field := range w.Activities {
313313
act := field.Get(ctx)
314-
if act.ActivityState.GetScheduledEventId() == scheduledEventID {
314+
if act.GetScheduledEventId() == scheduledEventID {
315315
return act
316316
}
317317
}

service/history/workflow/mutable_state_impl.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3339,11 +3339,6 @@ func (ms *MutableStateImpl) ScheduleWorkflowTask() error {
33393339
return ScheduleWorkflowTask(ms)
33403340
}
33413341

3342-
// WriteActivityTaskStartedHistoryEvent implements chasm.NodeBackend. It writes an
3343-
// ActivityTaskStarted history event for a workflow-embedded CHASM activity.
3344-
// The event is buffered — the real event ID is assigned during FlushBufferToCurrentBatch.
3345-
// Both started and completed events must be written in the same transaction so that
3346-
// the history builder can wire the started event ID into the completed event automatically.
33473342
// AddWorkflowTaskScheduledEventAsHeartbeat is to record the first WorkflowTaskScheduledEvent during workflow task heartbeat.
33483343
func (ms *MutableStateImpl) AddWorkflowTaskScheduledEventAsHeartbeat(
33493344
bypassTaskGeneration bool,

0 commit comments

Comments
 (0)