Skip to content

Commit 1f5dc17

Browse files
authored
IWF-512: Add start timestamp field to all events (#541)
1 parent 8852a80 commit 1f5dc17

File tree

4 files changed

+101
-77
lines changed

4 files changed

+101
-77
lines changed

service/api/service.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -633,12 +633,15 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(
633633
return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId())
634634
}
635635

636+
stateApiExecuteStartTime := time.Now().UnixMilli()
637+
636638
defer func() {
637639
event.Handle(iwfidl.IwfEvent{
638-
EventType: iwfidl.RPC_EXECUTION_EVENT,
639-
RpcName: &req.RpcName,
640-
WorkflowType: rpcPrep.IwfWorkflowType,
641-
WorkflowId: req.GetWorkflowId(),
640+
EventType: iwfidl.RPC_EXECUTION_EVENT,
641+
RpcName: &req.RpcName,
642+
WorkflowType: rpcPrep.IwfWorkflowType,
643+
WorkflowId: req.GetWorkflowId(),
644+
StartTimestampInMs: ptr.Any(stateApiExecuteStartTime),
642645
// search attributes are not available at this time
643646
})
644647
}()

service/interpreter/activityImpl.go

+36-28
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ func StateApiWaitUntil(
5757
printDebugMsg(logger, err, iwfWorkerBaseUrl)
5858
if checkHttpError(err, httpResp) {
5959
event.Handle(iwfidl.IwfEvent{
60-
EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT,
61-
WorkflowType: input.Request.WorkflowType,
62-
WorkflowId: activityInfo.WorkflowExecution.ID,
63-
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
64-
StateId: ptr.Any(input.Request.WorkflowStateId),
65-
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
66-
SearchAttributes: searchAttributes,
60+
EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT,
61+
WorkflowType: input.Request.WorkflowType,
62+
WorkflowId: activityInfo.WorkflowExecution.ID,
63+
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
64+
StateId: ptr.Any(input.Request.WorkflowStateId),
65+
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
66+
StartTimestampInMs: ptr.Any(stateApiWaitUntilStartTime),
67+
EndTimestampInMs: ptr.Any(time.Now().UnixMilli()),
68+
SearchAttributes: searchAttributes,
6769
})
6870
return nil, composeHttpError(
6971
activityInfo.IsLocalActivity,
@@ -72,13 +74,15 @@ func StateApiWaitUntil(
7274

7375
if err := checkCommandRequestFromWaitUntilResponse(resp); err != nil {
7476
event.Handle(iwfidl.IwfEvent{
75-
EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT,
76-
WorkflowType: input.Request.WorkflowType,
77-
WorkflowId: activityInfo.WorkflowExecution.ID,
78-
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
79-
StateId: ptr.Any(input.Request.WorkflowStateId),
80-
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
81-
SearchAttributes: searchAttributes,
77+
EventType: iwfidl.STATE_WAIT_UNTIL_ATTEMPT_FAIL_EVENT,
78+
WorkflowType: input.Request.WorkflowType,
79+
WorkflowId: activityInfo.WorkflowExecution.ID,
80+
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
81+
StateId: ptr.Any(input.Request.WorkflowStateId),
82+
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
83+
StartTimestampInMs: ptr.Any(stateApiWaitUntilStartTime),
84+
EndTimestampInMs: ptr.Any(time.Now().UnixMilli()),
85+
SearchAttributes: searchAttributes,
8286
})
8387
return nil, composeStartApiRespError(provider, err, resp)
8488
}
@@ -147,13 +151,15 @@ func StateApiExecute(
147151
printDebugMsg(logger, err, iwfWorkerBaseUrl)
148152
if checkHttpError(err, httpResp) {
149153
event.Handle(iwfidl.IwfEvent{
150-
EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT,
151-
WorkflowType: input.Request.WorkflowType,
152-
WorkflowId: activityInfo.WorkflowExecution.ID,
153-
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
154-
StateId: ptr.Any(input.Request.WorkflowStateId),
155-
StateExecutionId: input.Request.Context.StateExecutionId,
156-
SearchAttributes: searchAttributes,
154+
EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT,
155+
WorkflowType: input.Request.WorkflowType,
156+
WorkflowId: activityInfo.WorkflowExecution.ID,
157+
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
158+
StateId: ptr.Any(input.Request.WorkflowStateId),
159+
StateExecutionId: input.Request.Context.StateExecutionId,
160+
StartTimestampInMs: ptr.Any(stateApiExecuteStartTime),
161+
EndTimestampInMs: ptr.Any(time.Now().UnixMilli()),
162+
SearchAttributes: searchAttributes,
157163
})
158164
return nil, composeHttpError(
159165
activityInfo.IsLocalActivity,
@@ -162,13 +168,15 @@ func StateApiExecute(
162168

163169
if err = checkStateDecisionFromResponse(resp); err != nil {
164170
event.Handle(iwfidl.IwfEvent{
165-
EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT,
166-
WorkflowType: input.Request.WorkflowType,
167-
WorkflowId: activityInfo.WorkflowExecution.ID,
168-
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
169-
StateId: ptr.Any(input.Request.WorkflowStateId),
170-
StateExecutionId: input.Request.Context.StateExecutionId,
171-
SearchAttributes: searchAttributes,
171+
EventType: iwfidl.STATE_EXECUTE_ATTEMPT_FAIL_EVENT,
172+
WorkflowType: input.Request.WorkflowType,
173+
WorkflowId: activityInfo.WorkflowExecution.ID,
174+
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
175+
StateId: ptr.Any(input.Request.WorkflowStateId),
176+
StateExecutionId: input.Request.Context.StateExecutionId,
177+
StartTimestampInMs: ptr.Any(stateApiExecuteStartTime),
178+
EndTimestampInMs: ptr.Any(time.Now().UnixMilli()),
179+
SearchAttributes: searchAttributes,
172180
})
173181
return nil, composeExecuteApiRespError(provider, err, resp)
174182
}

service/interpreter/workflowImpl.go

+49-40
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@ func InterpreterImpl(
4545
})
4646
} else if provider.IsApplicationError(retErr) {
4747
event.Handle(iwfidl.IwfEvent{
48-
EventType: iwfidl.WORKFLOW_FAIL_EVENT,
49-
WorkflowType: input.IwfWorkflowType,
50-
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
51-
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
52-
SearchAttributes: sas,
48+
EventType: iwfidl.WORKFLOW_FAIL_EVENT,
49+
WorkflowType: input.IwfWorkflowType,
50+
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
51+
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
52+
SearchAttributes: sas,
53+
StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()),
54+
EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()),
5355
})
5456
}
5557
}
@@ -165,11 +167,12 @@ func InterpreterImpl(
165167
if !input.IsResumeFromContinueAsNew {
166168
if !provider.IsReplaying(ctx) {
167169
event.Handle(iwfidl.IwfEvent{
168-
EventType: iwfidl.WORKFLOW_START_EVENT,
169-
WorkflowType: input.IwfWorkflowType,
170-
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
171-
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
172-
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
170+
EventType: iwfidl.WORKFLOW_START_EVENT,
171+
WorkflowType: input.IwfWorkflowType,
172+
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
173+
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
174+
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
175+
StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()),
173176
})
174177
}
175178
// it's possible that a workflow is started without any starting state
@@ -596,18 +599,19 @@ func processStateExecution(
596599
saLoadingPolicy := compatibility.GetWaitUntilApiSearchAttributesLoadingPolicy(state.StateOptions)
597600
doLoadingPolicy := compatibility.GetWaitUntilApiDataObjectsLoadingPolicy(state.StateOptions)
598601

602+
stateWaitUntilApiStartTime := provider.Now(ctx).UnixMilli()
599603
if !provider.IsReplaying(ctx) {
600604
event.Handle(iwfidl.IwfEvent{
601-
EventType: iwfidl.STATE_WAIT_UNTIL_EE_START_EVENT,
602-
WorkflowType: basicInfo.IwfWorkflowType,
603-
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
604-
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
605-
StateId: ptr.Any(state.StateId),
606-
StateExecutionId: ptr.Any(stateExeId),
607-
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
605+
EventType: iwfidl.STATE_WAIT_UNTIL_EE_START_EVENT,
606+
WorkflowType: basicInfo.IwfWorkflowType,
607+
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
608+
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
609+
StateId: ptr.Any(state.StateId),
610+
StateExecutionId: ptr.Any(stateExeId),
611+
StartTimestampInMs: ptr.Any(stateWaitUntilApiStartTime),
612+
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
608613
})
609614
}
610-
stateWaitUntilApiStartTime := provider.Now(ctx).UnixMilli()
611615
errStartApi = provider.ExecuteActivity(&startResponse, configer.ShouldOptimizeActivity(), ctx,
612616
waitUntilApi, provider.GetBackendType(), service.StateStartActivityInput{
613617
IwfWorkerUrl: basicInfo.IwfWorkerUrl,
@@ -636,13 +640,15 @@ func processStateExecution(
636640
})
637641
} else {
638642
event.Handle(iwfidl.IwfEvent{
639-
EventType: iwfidl.STATE_WAIT_UNTIL_EE_FAIL_EVENT,
640-
WorkflowType: basicInfo.IwfWorkflowType,
641-
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
642-
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
643-
StateId: ptr.Any(state.StateId),
644-
StateExecutionId: ptr.Any(stateExeId),
645-
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
643+
EventType: iwfidl.STATE_WAIT_UNTIL_EE_FAIL_EVENT,
644+
WorkflowType: basicInfo.IwfWorkflowType,
645+
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
646+
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
647+
StateId: ptr.Any(state.StateId),
648+
StateExecutionId: ptr.Any(stateExeId),
649+
StartTimestampInMs: ptr.Any(stateWaitUntilApiStartTime),
650+
EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()),
651+
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
646652
})
647653
}
648654
}
@@ -872,18 +878,19 @@ func invokeStateExecute(
872878
ctx = provider.WithActivityOptions(ctx, activityOptions)
873879
var decideResponse *iwfidl.WorkflowStateDecideResponse
874880

881+
stateExecuteApiStartTime := provider.Now(ctx).UnixMilli()
875882
if !provider.IsReplaying(ctx) {
876883
event.Handle(iwfidl.IwfEvent{
877-
EventType: iwfidl.STATE_EXECUTE_EE_START_EVENT,
878-
WorkflowType: basicInfo.IwfWorkflowType,
879-
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
880-
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
881-
StateId: ptr.Any(state.StateId),
882-
StateExecutionId: ptr.Any(stateExeId),
883-
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
884+
EventType: iwfidl.STATE_EXECUTE_EE_START_EVENT,
885+
WorkflowType: basicInfo.IwfWorkflowType,
886+
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
887+
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
888+
StateId: ptr.Any(state.StateId),
889+
StateExecutionId: ptr.Any(stateExeId),
890+
StartTimestampInMs: ptr.Any(stateExecuteApiStartTime),
891+
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
884892
})
885893
}
886-
stateExecuteApiStartTime := provider.Now(ctx).UnixMilli()
887894
err = provider.ExecuteActivity(&decideResponse, configer.ShouldOptimizeActivity(), ctx,
888895
executeApi, provider.GetBackendType(), service.StateDecideActivityInput{
889896
IwfWorkerUrl: basicInfo.IwfWorkerUrl,
@@ -913,13 +920,15 @@ func invokeStateExecute(
913920
})
914921
} else {
915922
event.Handle(iwfidl.IwfEvent{
916-
EventType: iwfidl.STATE_EXECUTE_EE_FAIL_EVENT,
917-
WorkflowType: basicInfo.IwfWorkflowType,
918-
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
919-
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
920-
StateId: ptr.Any(state.StateId),
921-
StateExecutionId: ptr.Any(stateExeId),
922-
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
923+
EventType: iwfidl.STATE_EXECUTE_EE_FAIL_EVENT,
924+
WorkflowType: basicInfo.IwfWorkflowType,
925+
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
926+
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
927+
StateId: ptr.Any(state.StateId),
928+
StartTimestampInMs: ptr.Any(stateExecuteApiStartTime),
929+
EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()),
930+
StateExecutionId: ptr.Any(stateExeId),
931+
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
923932
})
924933
}
925934
}

service/interpreter/workflowUpdater.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/indeedeng/iwf/gen/iwfidl"
55
"github.com/indeedeng/iwf/service"
66
"github.com/indeedeng/iwf/service/common/event"
7+
"github.com/indeedeng/iwf/service/common/ptr"
78
"github.com/indeedeng/iwf/service/interpreter/config"
89
"github.com/indeedeng/iwf/service/interpreter/cont"
910
"github.com/indeedeng/iwf/service/interpreter/interfaces"
@@ -61,14 +62,17 @@ func (u *WorkflowUpdater) handler(
6162

6263
info := u.provider.GetWorkflowInfo(ctx)
6364

65+
rpcExecutionStartTime := u.provider.Now(ctx).UnixMilli()
66+
6467
defer func() {
6568
if !u.provider.IsReplaying(ctx) {
6669
event.Handle(iwfidl.IwfEvent{
67-
EventType: iwfidl.RPC_EXECUTION_EVENT,
68-
RpcName: &input.RpcName,
69-
WorkflowType: u.basicInfo.IwfWorkflowType,
70-
WorkflowId: info.WorkflowExecution.ID,
71-
SearchAttributes: u.persistenceManager.GetAllSearchAttributes(),
70+
EventType: iwfidl.RPC_EXECUTION_EVENT,
71+
RpcName: &input.RpcName,
72+
WorkflowType: u.basicInfo.IwfWorkflowType,
73+
WorkflowId: info.WorkflowExecution.ID,
74+
StartTimestampInMs: ptr.Any(rpcExecutionStartTime),
75+
SearchAttributes: u.persistenceManager.GetAllSearchAttributes(),
7276
})
7377
}
7478
}()

0 commit comments

Comments
 (0)