-
Notifications
You must be signed in to change notification settings - Fork 1.8k
CRE Operational Events in Engine #17057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
d69c50d
to
0afe074
Compare
platform.KeyDonID, strconv.Itoa(int(nodeState.WorkflowDON.ID)), | ||
platform.KeyDonF, strconv.Itoa(int(nodeState.WorkflowDON.F)), | ||
platform.KeyDonN, strconv.Itoa(len(nodeState.WorkflowDON.Members)), | ||
platform.KeyDonQ, strconv.Itoa(aggregation.ByzantineQuorum( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: today we use F + 1. Updating that to ByzantineQuroum here: #17109
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depends what kinds of quorum you mean. We are using ByzQ for consensus/OCR and F+1 for aggregating remote capability responses.
int32 donF = 6; | ||
int32 donN = 7; | ||
int32 donQ = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry just for my knowledge what is those fields used for ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
F is the manually selected maximum number of faulty/dishonest nodes in the DON. N is the manually selected number of nodes in the DON (For prod, N >= 3F + 1). Q is the quorum size we've calculated in the engine, which is the number of identical requests/responses in the trigger and Don2Don layer we need before considering a trigger or capability request/response valid.
} | ||
|
||
return beholder.GetEmitter().Emit(ctx, b, | ||
"beholder_data_schema", schema, // required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these magical values? if there is a typo like schema -> scheam does all sort of stuff downstream break?
does the behold package define the values and import and use them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these magical values? if there is a typo like schema -> scheam does all sort of stuff downstream break?
Yes, they are
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, what's the reason for copying and pasting rather than referencing them in a beholder
defined api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The values are magical, in that consumers will break, but they aren't stable to the point where I think any of us thought about putting them behind an API. It's a good idea, and I can add a ticket to do that, but I'd propose we not block on that here.
core/services/workflows/engine.go
Outdated
schema = "/cre-events-workflow-started/v1" | ||
entity = "WorkflowExecutionStarted" | ||
case *pb.WorkflowExecutionFinished: | ||
schema = "/cre-events-workflow-finished/v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the string val here doesn't match the OperationalEventsSchema var val. why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After I decomposed the events proto so that beholder could process it, I didn't update the events.go helper. I'll update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
return beholder.GetEmitter().Emit(ctx, b, | ||
"beholder_data_schema", schema, // required | ||
"beholder_domain", "platform", // required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be cre
domain ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep platform
for now and then migrate everything later
|
||
option go_package = "github.com/smartcontractkit/chainlink/core/services/workflows/pb/"; | ||
|
||
package pb; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes sense to follow this pattern for naming proto packages {domain}.{version}
e.g cre.v1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its verbose at the go pkg level and in the naming schema in Atlas. I understand this means that all events in the platform
(eventually cre
) domain have to be unique, and I'm happy with that tradeoff.
core/services/workflows/engine.go
Outdated
switch msg.(type) { | ||
case *pb.WorkflowExecutionStarted: | ||
schema = "/cre-events-workflow-started/v1" | ||
entity = "WorkflowExecutionStarted" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Entity should be {pb_package_name}.{message_mane}
: pb.WorkflowExecutionStarted
Example
chainlink/core/chains/evm/txm/metrics.go
Lines 130 to 132 in 83dc5cd
"beholder_domain", "svr", | |
"beholder_entity", "svr.v1.TxMessage", | |
"beholder_data_schema", "/beholder-tx-message/versions/2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes sense to give a more meaningful name to the proto package e.g
platform
or cre
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated so that entity is prefixed by the proto pkg name: 7d49206
@@ -676,6 +685,10 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter | |||
} | |||
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l) | |||
l.Infof("execution duration: %d (seconds)", executionDuration) | |||
err = emitExecutionFinishedEvent(ctx, cma, status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
n00b question, how is this emit different than logCustMsg in L677? they both go to beholder?
Per emitProtoMessage marshals a proto.Message and emits it via beholder.
this emits to beholder, and cma.Emit(ctx, msg)
in logCustMsg seems to beholder as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They both go through the OTEL pipeline. Eventually once CHiP client integration into Beholder is complete, they will both go through the OTEL and CHiP ingresses.
The difference comes from how the protos are handled. The BaseMessage field has a map[string]string of arbitrary KVs pairs supplied at runtime, so the logCustMsg
worked with a struct that would collect those labels (similar to a logger), and then be able to set those on the proto. And, since each custom message emitted used one proto, we could have a typed MessageEmitter interface to handle it.
Here, the approach is from a different direction - we have multiple typed protos that we want to be able to emit a set of known labels for in a repeatable way.
If we wanted to expand that interface to take all protos, we would have to update it to take an any
type, which seemed messy to me. @EasterTheBunny gave this a try here: smartcontractkit/chainlink-common#1075
@@ -997,13 +1010,32 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe | |||
defer cancel() | |||
|
|||
e.metrics.with(platform.KeyCapabilityID, curStep.ID).incrementCapabilityInvocationCounter(ctx) | |||
output, err := curStep.capability.Execute(stepCtx, tr) | |||
err = emitCapabilityStartedEvent(ctx, e.cma, curStep.ID, msg.stepRef) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is helpful to emit the payload to the cap, inputsMap? Maybe some concern on this so we are not logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do it with the values.Map type now, but I'm not sure that approach is future proof for the no-dag SDK. How would a front end parse that data? Each capability response will be typed, so to allow a front end to parse that, we would have to register a new proto to the data platform. The inputs/outputs within a capability are also sensitive to data governance issues that high level metadata like status are not.
Flakeguard SummaryRan new or updated tests between View Flaky Detector Details | Compare Changes Found Flaky Tests ❌1 Results
ArtifactsFor detailed logs of the failed tests, please refer to the artifact failed-test-results-with-logs.json. |
@@ -1550,6 +1550,9 @@ func emitProtoMessage(ctx context.Context, msg proto.Message) error { | |||
return fmt.Errorf("unknown message type: %T", msg) | |||
} | |||
|
|||
// entity must be prefixed with the proto package name | |||
entity = fmt.Sprintf("%s.%s", EventsProtoPkg, entity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
|
||
if donIDStr, ok := kvs[platform.KeyDonID]; ok { | ||
if id, err := strconv.ParseInt(donIDStr, 10, 32); err == nil { | ||
m.DonF = int32(id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m.DonID = int32(id)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1355,7 +1387,22 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { | |||
return nil, fmt.Errorf("could not initialize monitoring resources: %w", err) | |||
} | |||
|
|||
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()) | |||
nodeState, err := cfg.Registry.LocalNode(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed this line. In init() we retrieve this value with retries in case the Registry is not ready. I don't think we have a guarantee that it will be ready here... cc @cedric-cordenier
CRE-375
Now that our workflow events have matured, we want to migrate from using BaseMessage with runtime defined KV pairs, to pre-defined KVs in events.proto.
This uses Beholder as CHiP ingress will be included with Beholder when first shipped (Dual Source)
Future Considerations
platform
instead of migrating tocre
- we will migrate later on.