@@ -157,13 +157,13 @@ func (e *Engine) Start(_ context.Context) error {
157
157
// validate if adding another workflow would exceed either the global or per owner engine count limit
158
158
ownerAllow , globalAllow := e .workflowLimits .Allow (e .workflow .owner )
159
159
if ! globalAllow {
160
- e .metrics .with ( platform . KeyWorkflowID , e . workflow . id , platform . KeyWorkflowOwner , e . workflow . owner ). incrementWorkflowLimitGlobalCounter (ctx )
160
+ e .metrics .incrementWorkflowLimitGlobalCounter (ctx )
161
161
logCustMsg (ctx , e .cma .With (platform .KeyWorkflowID , e .workflow .id , platform .KeyWorkflowOwner , e .workflow .owner ), types .ErrGlobalWorkflowCountLimitReached .Error (), e .logger )
162
162
return types .ErrGlobalWorkflowCountLimitReached
163
163
}
164
164
165
165
if ! ownerAllow {
166
- e .metrics .with ( platform . KeyWorkflowID , e . workflow . id , platform . KeyWorkflowOwner , e . workflow . owner ). incrementWorkflowLimitPerOwnerCounter (ctx )
166
+ e .metrics .incrementWorkflowLimitPerOwnerCounter (ctx )
167
167
logCustMsg (ctx , e .cma .With (platform .KeyWorkflowID , e .workflow .id , platform .KeyWorkflowOwner , e .workflow .owner ), types .ErrPerOwnerWorkflowCountLimitReached .Error (), e .logger )
168
168
return types .ErrPerOwnerWorkflowCountLimitReached
169
169
}
@@ -649,13 +649,13 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter
649
649
if exists {
650
650
// send metering report to beholder
651
651
if err = events .EmitMeteringReport (ctx , cma , report .Message ()); err != nil {
652
- e .metrics .with ( platform . KeyWorkflowID , e . workflow . id , platform . KeyWorkflowExecutionID , executionID ). incrementWorkflowMissingMeteringReport (ctx )
652
+ e .metrics .incrementWorkflowMissingMeteringReport (ctx )
653
653
l .Warn (fmt .Sprintf ("metering report send to beholder error %s" , err ))
654
654
}
655
655
656
656
// send metering report to billing if billing client is not nil
657
657
if err = e .sendMeteringReportToBilling (ctx , report , e .workflow .id , executionID ); err != nil {
658
- e .metrics .with ( platform . KeyWorkflowID , e . workflow . id , platform . KeyWorkflowExecutionID , executionID ). incrementWorkflowMissingMeteringReport (ctx )
658
+ e .metrics .incrementWorkflowMissingMeteringReport (ctx )
659
659
l .Warn (fmt .Sprintf ("metering report send to billing error %s" , err ))
660
660
}
661
661
}
@@ -735,14 +735,14 @@ func (e *Engine) worker(ctx context.Context) {
735
735
e .onRateLimit (executionID )
736
736
e .logger .With (platform .KeyWorkflowID , e .workflow .id , platform .KeyWorkflowOwner , e .workflow .owner , platform .KeyWorkflowExecutionID , executionID ).Errorf ("failed to start execution: per sender rate limit exceeded" )
737
737
logCustMsg (ctx , e .cma .With (platform .KeyCapabilityID , te .ID ), "failed to start execution: per sender rate limit exceeded" , e .logger )
738
- e .metrics .with (platform .KeyWorkflowID , e . workflow . id , platform . KeyWorkflowExecutionID , executionID , platform . KeyTriggerID , te .ID , platform . KeyWorkflowOwner , e . workflow . owner ).incrementWorkflowExecutionRateLimitPerUserCounter (ctx )
738
+ e .metrics .with (platform .KeyTriggerID , te .ID ).incrementWorkflowExecutionRateLimitPerUserCounter (ctx )
739
739
continue
740
740
}
741
741
if ! globalAllowed {
742
742
e .onRateLimit (executionID )
743
743
e .logger .With (platform .KeyWorkflowID , e .workflow .id , platform .KeyWorkflowOwner , e .workflow .owner , platform .KeyWorkflowExecutionID , executionID ).Errorf ("failed to start execution: global rate limit exceeded" )
744
744
logCustMsg (ctx , e .cma .With (platform .KeyCapabilityID , te .ID ), "failed to start execution: global rate limit exceeded" , e .logger )
745
- e .metrics .with (platform .KeyWorkflowID , e . workflow . id , platform . KeyWorkflowExecutionID , executionID , platform . KeyTriggerID , te .ID , platform . KeyWorkflowOwner , e . workflow . owner ).incrementWorkflowExecutionRateLimitGlobalCounter (ctx )
745
+ e .metrics .with (platform .KeyTriggerID , te .ID ).incrementWorkflowExecutionRateLimitGlobalCounter (ctx )
746
746
continue
747
747
}
748
748
@@ -830,9 +830,9 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
830
830
if err := rpt .SetStep (MeteringReportStepRef (stepState .Ref ), meteringSteps ); err != nil {
831
831
l .Error (fmt .Sprintf ("failed to set metering report step for ref %s: %s" , stepState .Ref , err ))
832
832
}
833
- e .metrics .with (platform .KeyWorkflowID , e .workflow .id , platform . KeyWorkflowExecutionID , msg . state . ExecutionID ).incrementWorkflowMissingMeteringReport (ctx )
833
+ e .metrics .with (platform .KeyWorkflowID , e .workflow .id ).incrementWorkflowMissingMeteringReport (ctx )
834
834
} else {
835
- e .metrics .with (platform .KeyWorkflowID , e .workflow .id , platform . KeyWorkflowExecutionID , msg . state . ExecutionID ).incrementWorkflowMissingMeteringReport (ctx )
835
+ e .metrics .with (platform .KeyWorkflowID , e .workflow .id ).incrementWorkflowMissingMeteringReport (ctx )
836
836
// TODO: to be bumped to error if all capabilities must implement metering
837
837
l .Warnf ("no metering report found for %v" , msg .state .ExecutionID )
838
838
}
0 commit comments