Skip to content

Commit d489f42

Browse files
CRE Operational Events in Engine (#17057)
* WIP: first pass on proto definitions * 2nd pass of proto definitions * adding stepRef to capability events * adding generate command, and events2.proto for comparison with oneof approach * adding operational event coverage to engine * adding labels to cma * cleaning up comments * oops; adding generated proto file * lint * lint; fixing err * Run `make generate`. * addressing comments * separated out protos * determine entity + schema based on proto message type * refactoring * using events.go vars * updating entity name to include proto pkg prefix * fixing test --------- Co-authored-by: pavel-raykov <[email protected]>
1 parent d8ec227 commit d489f42

16 files changed

+1087
-10
lines changed

core/platform/monitoring.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@ const (
1212
KeyWorkflowID = "workflowID"
1313
KeyWorkflowExecutionID = "workflowExecutionID"
1414
KeyWorkflowName = "workflowName"
15+
KeyWorkflowVersion = "workflowVersion"
1516
KeyWorkflowOwner = "workflowOwner"
1617
KeyStepID = "stepID"
1718
KeyStepRef = "stepRef"
19+
KeyDonID = "DonID"
20+
KeyDonF = "F"
21+
KeyDonN = "N"
22+
KeyDonQ = "Q"
23+
KeyP2PID = "p2pID"
1824
)
1925

2026
func LabelKeysSorted() iter.Seq[string] {

core/services/workflows/engine.go

Lines changed: 176 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@ import (
66
"encoding/hex"
77
"errors"
88
"fmt"
9+
"strconv"
910
"sync"
1011
"time"
1112

1213
"github.com/jonboulle/clockwork"
14+
"google.golang.org/protobuf/proto"
1315

16+
"github.com/smartcontractkit/chainlink-common/pkg/aggregation"
17+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1418
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1519
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
1620
"github.com/smartcontractkit/chainlink-common/pkg/metrics"
@@ -20,10 +24,10 @@ import (
2024
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
2125
"github.com/smartcontractkit/chainlink-common/pkg/workflows/exec"
2226
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
23-
2427
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
2528
"github.com/smartcontractkit/chainlink/v2/core/logger"
2629
"github.com/smartcontractkit/chainlink/v2/core/platform"
30+
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/pb"
2731
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter"
2832
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
2933
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter"
@@ -502,9 +506,14 @@ func generateExecutionID(workflowID, eventID string) (string, error) {
502506
}
503507

504508
// startExecution kicks off a new workflow execution when a trigger event is received.
505-
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
509+
func (e *Engine) startExecution(ctx context.Context, executionID string, triggerID string, event *values.Map) error {
506510
e.meterReports.Add(executionID, NewMeteringReport())
507511

512+
err := emitExecutionStartedEvent(ctx, e.cma, triggerID)
513+
if err != nil {
514+
e.logger.Errorf("failed to emit execution started event: %+v", err)
515+
}
516+
508517
lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID)
509518
lggr.Debug("executing on a trigger event")
510519
workflowExecution, err := e.executionsStore.Add(ctx, map[string]*store.WorkflowExecutionStep{
@@ -676,6 +685,10 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter
676685
}
677686
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l)
678687
l.Infof("execution duration: %d (seconds)", executionDuration)
688+
err = emitExecutionFinishedEvent(ctx, cma, status)
689+
if err != nil {
690+
e.logger.Errorf("failed to emit execution finished event: %+v", err)
691+
}
679692
e.onExecutionFinished(executionID)
680693
return nil
681694
}
@@ -731,7 +744,7 @@ func (e *Engine) worker(ctx context.Context) {
731744
}
732745

733746
cma := e.cma.With(platform.KeyWorkflowExecutionID, executionID)
734-
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
747+
err = e.startExecution(ctx, executionID, te.ID, resp.Event.Outputs)
735748
if err != nil {
736749
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err)
737750
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
@@ -997,13 +1010,32 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
9971010
defer cancel()
9981011

9991012
e.metrics.with(platform.KeyCapabilityID, curStep.ID).incrementCapabilityInvocationCounter(ctx)
1000-
output, err := curStep.capability.Execute(stepCtx, tr)
1013+
err = emitCapabilityStartedEvent(ctx, e.cma, curStep.ID, msg.stepRef)
10011014
if err != nil {
1015+
e.logger.Errorf("failed to emit capability event: %v", err)
1016+
}
1017+
output, capErr := curStep.capability.Execute(stepCtx, tr)
1018+
status := store.StatusCompleted
1019+
1020+
if capErr != nil {
1021+
status = store.StatusErrored
1022+
if capabilities.ErrStopExecution.Is(capErr) {
1023+
status = store.StatusCompletedEarlyExit
1024+
}
1025+
}
1026+
1027+
defer func() {
1028+
if err := emitCapabilityFinishedEvent(ctx, e.cma, curStep.ID, msg.stepRef, status); err != nil {
1029+
e.logger.Errorf("failed to emit capability event: %v", err)
1030+
}
1031+
}()
1032+
1033+
if capErr != nil {
10021034
e.metrics.with(platform.KeyStepRef, msg.stepRef, platform.KeyCapabilityID, curStep.ID).incrementCapabilityFailureCounter(ctx)
1003-
return inputsMap, capabilities.CapabilityResponse{}, err
1035+
return inputsMap, capabilities.CapabilityResponse{}, capErr
10041036
}
10051037

1006-
return inputsMap, output, err
1038+
return inputsMap, output, nil
10071039
}
10081040

10091041
func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, triggerIdx int) error {
@@ -1355,7 +1387,22 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
13551387
return nil, fmt.Errorf("could not initialize monitoring resources: %w", err)
13561388
}
13571389

1358-
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String())
1390+
nodeState, err := cfg.Registry.LocalNode(ctx)
1391+
if err != nil {
1392+
return nil, fmt.Errorf("could not get local node state: %w", err)
1393+
}
1394+
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID,
1395+
platform.KeyWorkflowOwner, cfg.WorkflowOwner,
1396+
platform.KeyWorkflowName, cfg.WorkflowName.String(),
1397+
platform.KeyDonID, strconv.Itoa(int(nodeState.WorkflowDON.ID)),
1398+
platform.KeyDonF, strconv.Itoa(int(nodeState.WorkflowDON.F)),
1399+
platform.KeyDonN, strconv.Itoa(len(nodeState.WorkflowDON.Members)),
1400+
platform.KeyDonQ, strconv.Itoa(aggregation.ByzantineQuorum(
1401+
len(nodeState.WorkflowDON.Members),
1402+
int(nodeState.WorkflowDON.F),
1403+
)),
1404+
platform.KeyP2PID, nodeState.PeerID.String(),
1405+
)
13591406
workflow, err := Parse(cfg.Workflow)
13601407
if err != nil {
13611408
logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
@@ -1440,3 +1487,125 @@ func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log
14401487
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
14411488
}
14421489
}
1490+
1491+
// buildWorkflowMetadata populates a WorkflowMetadata from kvs (map[string]string).
1492+
func buildWorkflowMetadata(kvs map[string]string) *pb.WorkflowMetadata {
1493+
m := &pb.WorkflowMetadata{}
1494+
1495+
m.WorkflowName = kvs[platform.KeyWorkflowName]
1496+
m.Version = kvs[platform.KeyWorkflowVersion]
1497+
m.WorkflowID = kvs[platform.KeyWorkflowID]
1498+
m.WorkflowExecutionID = kvs[platform.KeyWorkflowExecutionID]
1499+
1500+
if donIDStr, ok := kvs[platform.KeyDonID]; ok {
1501+
if id, err := strconv.ParseInt(donIDStr, 10, 32); err == nil {
1502+
m.DonF = int32(id)
1503+
}
1504+
}
1505+
1506+
m.P2PID = kvs[platform.KeyP2PID]
1507+
1508+
if donFStr, ok := kvs[platform.KeyDonF]; ok {
1509+
if id, err := strconv.ParseInt(donFStr, 10, 32); err == nil {
1510+
m.DonF = int32(id)
1511+
}
1512+
}
1513+
if donNStr, ok := kvs[platform.KeyDonN]; ok {
1514+
if id, err := strconv.ParseInt(donNStr, 10, 32); err == nil {
1515+
m.DonN = int32(id)
1516+
}
1517+
}
1518+
if donQStr, ok := kvs[platform.KeyDonQ]; ok {
1519+
if id, err := strconv.ParseInt(donQStr, 10, 32); err == nil {
1520+
m.DonQ = int32(id)
1521+
}
1522+
}
1523+
1524+
return m
1525+
}
1526+
1527+
// emitProtoMessage marshals a proto.Message and emits it via beholder.
1528+
func emitProtoMessage(ctx context.Context, msg proto.Message) error {
1529+
b, err := proto.Marshal(msg)
1530+
if err != nil {
1531+
return err
1532+
}
1533+
1534+
// Determine the schema and entity based on the message type
1535+
var schema, entity string
1536+
switch msg.(type) {
1537+
case *pb.WorkflowExecutionStarted:
1538+
schema = SchemaWorkflowStarted
1539+
entity = EventWorkflowExecutionStarted
1540+
case *pb.WorkflowExecutionFinished:
1541+
schema = SchemaWorkflowFinished
1542+
entity = EventWorkflowExecutionFinished
1543+
case *pb.CapabilityExecutionStarted:
1544+
schema = SchemaCapabilityStarted
1545+
entity = EventCapabilityExecutionStarted
1546+
case *pb.CapabilityExecutionFinished:
1547+
schema = SchemaCapabilityFinished
1548+
entity = EventCapabilityExecutionFinished
1549+
default:
1550+
return fmt.Errorf("unknown message type: %T", msg)
1551+
}
1552+
1553+
// entity must be prefixed with the proto package name
1554+
entity = fmt.Sprintf("%s.%s", EventsProtoPkg, entity)
1555+
1556+
return beholder.GetEmitter().Emit(ctx, b,
1557+
"beholder_data_schema", schema, // required
1558+
"beholder_domain", "platform", // required
1559+
"beholder_entity", entity) // required
1560+
}
1561+
1562+
func emitExecutionStartedEvent(ctx context.Context, cma custmsg.MessageEmitter, triggerID string) error {
1563+
metadata := buildWorkflowMetadata(cma.Labels())
1564+
1565+
event := &pb.WorkflowExecutionStarted{
1566+
M: metadata,
1567+
Timestamp: time.Now().String(),
1568+
TriggerID: triggerID,
1569+
}
1570+
1571+
return emitProtoMessage(ctx, event)
1572+
}
1573+
1574+
func emitExecutionFinishedEvent(ctx context.Context, cma custmsg.MessageEmitter, status string) error {
1575+
metadata := buildWorkflowMetadata(cma.Labels())
1576+
1577+
event := &pb.WorkflowExecutionFinished{
1578+
M: metadata,
1579+
Timestamp: time.Now().String(),
1580+
Status: status,
1581+
}
1582+
1583+
return emitProtoMessage(ctx, event)
1584+
}
1585+
1586+
func emitCapabilityStartedEvent(ctx context.Context, cma custmsg.MessageEmitter, capabilityID, stepRef string) error {
1587+
metadata := buildWorkflowMetadata(cma.Labels())
1588+
1589+
event := &pb.CapabilityExecutionStarted{
1590+
M: metadata,
1591+
Timestamp: time.Now().String(),
1592+
CapabilityID: capabilityID,
1593+
StepRef: stepRef,
1594+
}
1595+
1596+
return emitProtoMessage(ctx, event)
1597+
}
1598+
1599+
func emitCapabilityFinishedEvent(ctx context.Context, cma custmsg.MessageEmitter, capabilityID, stepRef, status string) error {
1600+
metadata := buildWorkflowMetadata(cma.Labels())
1601+
1602+
event := &pb.CapabilityExecutionFinished{
1603+
M: metadata,
1604+
Timestamp: time.Now().String(),
1605+
CapabilityID: capabilityID,
1606+
StepRef: stepRef,
1607+
Status: status,
1608+
}
1609+
1610+
return emitProtoMessage(ctx, event)
1611+
}

core/services/workflows/engine_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec
259259
detail := report.Description()
260260
bClient := beholder.GetClient()
261261
// kvAttrs is what the test client matches on, view pkg/utils/test in common for more detail
262-
kvAttrs := []any{"beholder_data_schema", detail.Schema, "beholder_domain", detail.Domain, "beholder_entity", detail.Entity,
262+
kvAttrs := []any{"beholder_data_schema", detail.Schema, "beholder_domain", detail.Domain,
263+
"beholder_entity", fmt.Sprintf("%s.%s", MeteringProtoPkg, detail.Entity),
263264
platform.KeyWorkflowName, name, platform.KeyWorkflowID, ID, platform.KeyWorkflowExecutionID, execID}
264265

265266
data, mErr := proto.Marshal(report.Message())
@@ -398,7 +399,12 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) {
398399
require.NoError(t, err)
399400

400401
assert.Equal(t, store.StatusCompleted, state.Status)
401-
assert.Equal(t, 1, beholderTester.Len(t, "beholder_entity", MeteringReportEntity))
402+
403+
assert.Equal(t, 1, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", MeteringProtoPkg, MeteringReportEntity)))
404+
assert.Equal(t, 1, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", EventsProtoPkg, EventWorkflowExecutionStarted)))
405+
assert.Equal(t, 1, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", EventsProtoPkg, EventWorkflowExecutionFinished)))
406+
assert.Equal(t, 3, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", EventsProtoPkg, EventCapabilityExecutionStarted)))
407+
assert.Equal(t, 3, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", EventsProtoPkg, EventCapabilityExecutionFinished)))
402408
}
403409

404410
const (
@@ -2274,7 +2280,7 @@ func TestEngine_ConcurrentExecutions(t *testing.T) {
22742280
eid2 := getExecutionID(t, eng, testHooks)
22752281

22762282
assert.Equal(t, store.StatusCompleted, state.Status)
2277-
assert.Equal(t, 2, beholderTester.Len(t, "beholder_entity", MeteringReportEntity))
2283+
assert.Equal(t, 2, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", MeteringProtoPkg, MeteringReportEntity)))
22782284
assert.Equal(t, 1, beholderTester.Len(t, platform.KeyWorkflowExecutionID, eid))
22792285
assert.Equal(t, 1, beholderTester.Len(t, platform.KeyWorkflowExecutionID, eid2))
22802286
}

core/services/workflows/events.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package workflows
2+
3+
const (
4+
EventsProtoPkg = "pb"
5+
// EventWorkflowExecutionStarted represents a workflow execution started event
6+
EventWorkflowExecutionStarted string = "WorkflowExecutionStarted"
7+
// EventWorkflowExecutionFinished represents a workflow execution finished event
8+
EventWorkflowExecutionFinished string = "WorkflowExecutionFinished"
9+
// EventCapabilityExecutionStarted represents a capability execution started event
10+
EventCapabilityExecutionStarted string = "CapabilityExecutionStarted"
11+
// EventCapabilityExecutionFinished represents a capability execution finished event
12+
EventCapabilityExecutionFinished string = "CapabilityExecutionFinished"
13+
14+
// SchemaWorkflowStarted represents the schema for workflow started events
15+
SchemaWorkflowStarted string = "/cre-events-workflow-started/v1"
16+
// SchemaWorkflowFinished represents the schema for workflow finished events
17+
SchemaWorkflowFinished string = "/cre-events-workflow-finished/v1"
18+
// SchemaCapabilityStarted represents the schema for capability started events
19+
SchemaCapabilityStarted string = "/cre-events-capability-started/v1"
20+
// SchemaCapabilityFinished represents the schema for capability finished events
21+
SchemaCapabilityFinished string = "/cre-events-capability-finished/v1"
22+
)

core/services/workflows/metering.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const (
1515
MeteringReportSchema string = "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb/capabilities.proto"
1616
MeteringReportDomain string = "platform"
1717
MeteringReportEntity string = "MeteringReport"
18+
MeteringProtoPkg string = "pb"
1819
)
1920

2021
type MeteringReportStepRef string

0 commit comments

Comments
 (0)