Skip to content

CRE Operational Events in Engine #17057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/platform/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ const (
KeyWorkflowID = "workflowID"
KeyWorkflowExecutionID = "workflowExecutionID"
KeyWorkflowName = "workflowName"
KeyWorkflowVersion = "workflowVersion"
KeyWorkflowOwner = "workflowOwner"
KeyStepID = "stepID"
KeyStepRef = "stepRef"
KeyDonID = "DonID"
KeyDonF = "F"
KeyDonN = "N"
KeyDonQ = "Q"
KeyP2PID = "p2pID"
)

func LabelKeysSorted() iter.Seq[string] {
Expand Down
183 changes: 176 additions & 7 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"encoding/hex"
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/jonboulle/clockwork"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/aggregation"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"
Expand All @@ -20,10 +24,10 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/exec"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/pb"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter"
Expand Down Expand Up @@ -502,9 +506,14 @@ func generateExecutionID(workflowID, eventID string) (string, error) {
}

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
func (e *Engine) startExecution(ctx context.Context, executionID string, triggerID string, event *values.Map) error {
e.meterReports.Add(executionID, NewMeteringReport())

err := emitExecutionStartedEvent(ctx, e.cma, triggerID)
if err != nil {
e.logger.Errorf("failed to emit execution started event: %+v", err)
}

lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID)
lggr.Debug("executing on a trigger event")
workflowExecution, err := e.executionsStore.Add(ctx, map[string]*store.WorkflowExecutionStep{
Expand Down Expand Up @@ -676,6 +685,10 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter
}
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l)
l.Infof("execution duration: %d (seconds)", executionDuration)
err = emitExecutionFinishedEvent(ctx, cma, status)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n00b question, how is this emit different than logCustMsg in L677? they both go to beholder?

Per emitProtoMessage marshals a proto.Message and emits it via beholder. this emits to beholder, and cma.Emit(ctx, msg) in logCustMsg seems to beholder as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They both go through the OTEL pipeline. Eventually once CHiP client integration into Beholder is complete, they will both go through the OTEL and CHiP ingresses.

The difference comes from how the protos are handled. The BaseMessage field has a map[string]string of arbitrary KVs pairs supplied at runtime, so the logCustMsg worked with a struct that would collect those labels (similar to a logger), and then be able to set those on the proto. And, since each custom message emitted used one proto, we could have a typed MessageEmitter interface to handle it.

Here, the approach is from a different direction - we have multiple typed protos that we want to be able to emit a set of known labels for in a repeatable way.

If we wanted to expand that interface to take all protos, we would have to update it to take an any type, which seemed messy to me. @EasterTheBunny gave this a try here: smartcontractkit/chainlink-common#1075

if err != nil {
e.logger.Errorf("failed to emit execution finished event: %+v", err)
}
e.onExecutionFinished(executionID)
return nil
}
Expand Down Expand Up @@ -731,7 +744,7 @@ func (e *Engine) worker(ctx context.Context) {
}

cma := e.cma.With(platform.KeyWorkflowExecutionID, executionID)
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
err = e.startExecution(ctx, executionID, te.ID, resp.Event.Outputs)
if err != nil {
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
Expand Down Expand Up @@ -997,13 +1010,32 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
defer cancel()

e.metrics.with(platform.KeyCapabilityID, curStep.ID).incrementCapabilityInvocationCounter(ctx)
output, err := curStep.capability.Execute(stepCtx, tr)
err = emitCapabilityStartedEvent(ctx, e.cma, curStep.ID, msg.stepRef)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is helpful to emit the payload to the cap, inputsMap? Maybe some concern on this so we are not logging?

Copy link
Contributor Author

@patrickhuie19 patrickhuie19 Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it with the values.Map type now, but I'm not sure that approach is future proof for the no-dag SDK. How would a front end parse that data? Each capability response will be typed, so to allow a front end to parse that, we would have to register a new proto to the data platform. The inputs/outputs within a capability are also sensitive to data governance issues that high level metadata like status are not.

if err != nil {
e.logger.Errorf("failed to emit capability event: %v", err)
}
output, capErr := curStep.capability.Execute(stepCtx, tr)
status := store.StatusCompleted

if capErr != nil {
status = store.StatusErrored
if capabilities.ErrStopExecution.Is(capErr) {
status = store.StatusCompletedEarlyExit
}
}

defer func() {
if err := emitCapabilityFinishedEvent(ctx, e.cma, curStep.ID, msg.stepRef, status); err != nil {
e.logger.Errorf("failed to emit capability event: %v", err)
}
}()

if capErr != nil {
e.metrics.with(platform.KeyStepRef, msg.stepRef, platform.KeyCapabilityID, curStep.ID).incrementCapabilityFailureCounter(ctx)
return inputsMap, capabilities.CapabilityResponse{}, err
return inputsMap, capabilities.CapabilityResponse{}, capErr
}

return inputsMap, output, err
return inputsMap, output, nil
}

func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, triggerIdx int) error {
Expand Down Expand Up @@ -1355,7 +1387,22 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
return nil, fmt.Errorf("could not initialize monitoring resources: %w", err)
}

cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String())
nodeState, err := cfg.Registry.LocalNode(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed this line. In init() we retrieve this value with retries in case the Registry is not ready. I don't think we have a guarantee that it will be ready here... cc @cedric-cordenier

if err != nil {
return nil, fmt.Errorf("could not get local node state: %w", err)
}
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID,
platform.KeyWorkflowOwner, cfg.WorkflowOwner,
platform.KeyWorkflowName, cfg.WorkflowName.String(),
platform.KeyDonID, strconv.Itoa(int(nodeState.WorkflowDON.ID)),
platform.KeyDonF, strconv.Itoa(int(nodeState.WorkflowDON.F)),
platform.KeyDonN, strconv.Itoa(len(nodeState.WorkflowDON.Members)),
platform.KeyDonQ, strconv.Itoa(aggregation.ByzantineQuorum(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: today we use F + 1. Updating that to ByzantineQuroum here: #17109

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends what kinds of quorum you mean. We are using ByzQ for consensus/OCR and F+1 for aggregating remote capability responses.

len(nodeState.WorkflowDON.Members),
int(nodeState.WorkflowDON.F),
)),
platform.KeyP2PID, nodeState.PeerID.String(),
)
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
Expand Down Expand Up @@ -1440,3 +1487,125 @@ func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log
log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

// buildWorkflowMetadata populates a WorkflowMetadata from kvs (map[string]string).
func buildWorkflowMetadata(kvs map[string]string) *pb.WorkflowMetadata {
m := &pb.WorkflowMetadata{}

m.WorkflowName = kvs[platform.KeyWorkflowName]
m.Version = kvs[platform.KeyWorkflowVersion]
m.WorkflowID = kvs[platform.KeyWorkflowID]
m.WorkflowExecutionID = kvs[platform.KeyWorkflowExecutionID]

if donIDStr, ok := kvs[platform.KeyDonID]; ok {
if id, err := strconv.ParseInt(donIDStr, 10, 32); err == nil {
m.DonF = int32(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m.DonID = int32(id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

m.P2PID = kvs[platform.KeyP2PID]

if donFStr, ok := kvs[platform.KeyDonF]; ok {
if id, err := strconv.ParseInt(donFStr, 10, 32); err == nil {
m.DonF = int32(id)
}
}
if donNStr, ok := kvs[platform.KeyDonN]; ok {
if id, err := strconv.ParseInt(donNStr, 10, 32); err == nil {
m.DonN = int32(id)
}
}
if donQStr, ok := kvs[platform.KeyDonQ]; ok {
if id, err := strconv.ParseInt(donQStr, 10, 32); err == nil {
m.DonQ = int32(id)
}
}

return m
}

// emitProtoMessage marshals a proto.Message and emits it via beholder.
func emitProtoMessage(ctx context.Context, msg proto.Message) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkcll could you review this section?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

b, err := proto.Marshal(msg)
if err != nil {
return err
}

// Determine the schema and entity based on the message type
var schema, entity string
switch msg.(type) {
case *pb.WorkflowExecutionStarted:
schema = SchemaWorkflowStarted
entity = EventWorkflowExecutionStarted
case *pb.WorkflowExecutionFinished:
schema = SchemaWorkflowFinished
entity = EventWorkflowExecutionFinished
case *pb.CapabilityExecutionStarted:
schema = SchemaCapabilityStarted
entity = EventCapabilityExecutionStarted
case *pb.CapabilityExecutionFinished:
schema = SchemaCapabilityFinished
entity = EventCapabilityExecutionFinished
default:
return fmt.Errorf("unknown message type: %T", msg)
}

// entity must be prefixed with the proto package name
entity = fmt.Sprintf("%s.%s", EventsProtoPkg, entity)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


return beholder.GetEmitter().Emit(ctx, b,
"beholder_data_schema", schema, // required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these magical values? if there is a typo like schema -> scheam does all sort of stuff downstream break?

does the behold package define the values and import and use them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these magical values? if there is a typo like schema -> scheam does all sort of stuff downstream break?

Yes, they are

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, what's the reason for copying and pasting rather than referencing them in a beholder defined api?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The values are magical, in that consumers will break, but they aren't stable to the point where I think any of us thought about putting them behind an API. It's a good idea, and I can add a ticket to do that, but I'd propose we not block on that here.

"beholder_domain", "platform", // required
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be cre domain ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep platform for now and then migrate everything later

"beholder_entity", entity) // required
}

func emitExecutionStartedEvent(ctx context.Context, cma custmsg.MessageEmitter, triggerID string) error {
metadata := buildWorkflowMetadata(cma.Labels())

event := &pb.WorkflowExecutionStarted{
M: metadata,
Timestamp: time.Now().String(),
TriggerID: triggerID,
}

return emitProtoMessage(ctx, event)
}

func emitExecutionFinishedEvent(ctx context.Context, cma custmsg.MessageEmitter, status string) error {
metadata := buildWorkflowMetadata(cma.Labels())

event := &pb.WorkflowExecutionFinished{
M: metadata,
Timestamp: time.Now().String(),
Status: status,
}

return emitProtoMessage(ctx, event)
}

func emitCapabilityStartedEvent(ctx context.Context, cma custmsg.MessageEmitter, capabilityID, stepRef string) error {
metadata := buildWorkflowMetadata(cma.Labels())

event := &pb.CapabilityExecutionStarted{
M: metadata,
Timestamp: time.Now().String(),
CapabilityID: capabilityID,
StepRef: stepRef,
}

return emitProtoMessage(ctx, event)
}

func emitCapabilityFinishedEvent(ctx context.Context, cma custmsg.MessageEmitter, capabilityID, stepRef, status string) error {
metadata := buildWorkflowMetadata(cma.Labels())

event := &pb.CapabilityExecutionFinished{
M: metadata,
Timestamp: time.Now().String(),
CapabilityID: capabilityID,
StepRef: stepRef,
Status: status,
}

return emitProtoMessage(ctx, event)
}
12 changes: 9 additions & 3 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec
detail := report.Description()
bClient := beholder.GetClient()
// kvAttrs is what the test client matches on, view pkg/utils/test in common for more detail
kvAttrs := []any{"beholder_data_schema", detail.Schema, "beholder_domain", detail.Domain, "beholder_entity", detail.Entity,
kvAttrs := []any{"beholder_data_schema", detail.Schema, "beholder_domain", detail.Domain,
"beholder_entity", fmt.Sprintf("%s.%s", MeteringProtoPkg, detail.Entity),
platform.KeyWorkflowName, name, platform.KeyWorkflowID, ID, platform.KeyWorkflowExecutionID, execID}

data, mErr := proto.Marshal(report.Message())
Expand Down Expand Up @@ -398,7 +399,12 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) {
require.NoError(t, err)

assert.Equal(t, store.StatusCompleted, state.Status)
assert.Equal(t, 1, beholderTester.Len(t, "beholder_entity", MeteringReportEntity))

assert.Equal(t, 1, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", MeteringProtoPkg, MeteringReportEntity)))
assert.Equal(t, 1, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", EventsProtoPkg, EventWorkflowExecutionStarted)))
assert.Equal(t, 1, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", EventsProtoPkg, EventWorkflowExecutionFinished)))
assert.Equal(t, 3, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", EventsProtoPkg, EventCapabilityExecutionStarted)))
assert.Equal(t, 3, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", EventsProtoPkg, EventCapabilityExecutionFinished)))
}

const (
Expand Down Expand Up @@ -2274,7 +2280,7 @@ func TestEngine_ConcurrentExecutions(t *testing.T) {
eid2 := getExecutionID(t, eng, testHooks)

assert.Equal(t, store.StatusCompleted, state.Status)
assert.Equal(t, 2, beholderTester.Len(t, "beholder_entity", MeteringReportEntity))
assert.Equal(t, 2, beholderTester.Len(t, "beholder_entity", fmt.Sprintf("%s.%s", MeteringProtoPkg, MeteringReportEntity)))
assert.Equal(t, 1, beholderTester.Len(t, platform.KeyWorkflowExecutionID, eid))
assert.Equal(t, 1, beholderTester.Len(t, platform.KeyWorkflowExecutionID, eid2))
}
22 changes: 22 additions & 0 deletions core/services/workflows/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package workflows

const (
EventsProtoPkg = "pb"
// EventWorkflowExecutionStarted represents a workflow execution started event
EventWorkflowExecutionStarted string = "WorkflowExecutionStarted"
// EventWorkflowExecutionFinished represents a workflow execution finished event
EventWorkflowExecutionFinished string = "WorkflowExecutionFinished"
// EventCapabilityExecutionStarted represents a capability execution started event
EventCapabilityExecutionStarted string = "CapabilityExecutionStarted"
// EventCapabilityExecutionFinished represents a capability execution finished event
EventCapabilityExecutionFinished string = "CapabilityExecutionFinished"

// SchemaWorkflowStarted represents the schema for workflow started events
SchemaWorkflowStarted string = "/cre-events-workflow-started/v1"
// SchemaWorkflowFinished represents the schema for workflow finished events
SchemaWorkflowFinished string = "/cre-events-workflow-finished/v1"
// SchemaCapabilityStarted represents the schema for capability started events
SchemaCapabilityStarted string = "/cre-events-capability-started/v1"
// SchemaCapabilityFinished represents the schema for capability finished events
SchemaCapabilityFinished string = "/cre-events-capability-finished/v1"
)
1 change: 1 addition & 0 deletions core/services/workflows/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
MeteringReportSchema string = "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb/capabilities.proto"
MeteringReportDomain string = "platform"
MeteringReportEntity string = "MeteringReport"
MeteringProtoPkg string = "pb"
)

type MeteringReportStepRef string
Expand Down
Loading
Loading