Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/golangci-lint/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ runs:
# golangci-lint runs with absolute path mode: --path-mode=abs
REPORT_PATH: ${{ github.workspace }}/${{ steps.set-working-directory.outputs.golangci-lint-working-directory }}golangci-lint-report.xml
with:
version: v2.2.1
version: v2.5.0
only-new-issues: true
args: --output.checkstyle.path=${{ env.REPORT_PATH }}
working-directory: ${{ steps.set-working-directory.outputs.golangci-lint-working-directory }}
Expand Down
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ nodejs 20.13.1
pnpm 10.6.5
postgres 15.1
helm 3.18.4
golangci-lint 2.2.1
golangci-lint 2.5.0
protoc 29.3
python 3.10.5
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ config-docs: ## Generate core node configuration documentation
.PHONY: golangci-lint
golangci-lint: ## Run golangci-lint for all issues.
[ -d "./golangci-lint" ] || mkdir ./golangci-lint && \
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v2.2.1 golangci-lint run --max-issues-per-linter 0 --max-same-issues 0 | tee ./golangci-lint/$(shell date +%Y-%m-%d_%H:%M:%S).txt
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v2.5.0 golangci-lint run --max-issues-per-linter 0 --max-same-issues 0 | tee ./golangci-lint/$(shell date +%Y-%m-%d_%H:%M:%S).txt

.PHONY: modgraph
modgraph:
Expand Down
67 changes: 45 additions & 22 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type launcher struct {
workflowDonNotifier donNotifier
don2donSharedPeer p2ptypes.SharedPeer
p2pStreamConfig p2ptypes.StreamConfig
metrics *launcherMetrics
}

// For V2 capabilities, shims are created once and their config is updated dynamically.
Expand Down Expand Up @@ -87,7 +88,7 @@ func NewLauncher(
dispatcher remotetypes.Dispatcher,
registry *Registry,
workflowDonNotifier donNotifier,
) *launcher {
) (*launcher, error) {
p2pStreamConfig := defaultStreamConfig
if streamConfig != nil {
p2pStreamConfig.IncomingMessageBufferSize = streamConfig.IncomingMessageBufferSize()
Expand All @@ -102,6 +103,10 @@ func NewLauncher(
Capacity: streamConfig.BytesRateLimiterCapacity(),
}
}
metrics, err := newLauncherMetrics()
if err != nil {
return nil, fmt.Errorf("failed to create launcher metrics: %w", err)
}
return &launcher{
lggr: logger.Named(lggr, "CapabilitiesLauncher"),
peerWrapper: peerWrapper,
Expand All @@ -118,7 +123,8 @@ func NewLauncher(
workflowDonNotifier: workflowDonNotifier,
don2donSharedPeer: don2donSharedPeer,
p2pStreamConfig: p2pStreamConfig,
}
metrics: metrics,
}, nil
}

// Maintain only necessary Don2Don connections:
Expand Down Expand Up @@ -395,6 +401,7 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
return fmt.Errorf("failed to update peer connections: %w", err)
}
}
w.metrics.incrementCompletedUpdates(ctx)
return nil
}

Expand Down Expand Up @@ -425,25 +432,33 @@ func donFamiliesOverlap(donA []string, donB []string) bool {
// it is best effort to ensure that valid capabilities are added even if some fail
func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysyncer.DON, remoteDON registrysyncer.DON, localRegistry *registrysyncer.LocalRegistry) {
for cid, c := range remoteDON.CapabilityConfigurations {
err := w.addRemoteCapability(ctx, cid, c, myDON, remoteDON, localRegistry)
capabilityConfig, err := c.Unmarshal()
if err != nil {
w.lggr.Errorw("could not unmarshal capability config", "myDON", myDON, "remoteDON", remoteDON, "capabilityID", cid, "error", err)
w.metrics.recordRemoteCapabilityAdded(ctx, cid, remoteDON.Name, resultFailure)
continue
}
if capabilityConfig.LocalOnly {
w.lggr.Debugw("skipping local-only capability", "myDON", myDON, "remoteDON", remoteDON, "capabilityID", cid)
w.metrics.recordRemoteCapabilityAdded(ctx, cid, remoteDON.Name, resultSkipped)
continue
}
err = w.addRemoteCapability(ctx, cid, capabilityConfig, myDON, remoteDON, localRegistry)
if err != nil {
// TODO CRE-1021 metrics for failures
w.lggr.Errorw("failed to add remote capability ", "myDON", myDON, "remoteDON", remoteDON, "capabilityID", cid, "err", err)
w.metrics.recordRemoteCapabilityAdded(ctx, cid, remoteDON.Name, resultFailure)
continue
}
w.metrics.recordRemoteCapabilityAdded(ctx, cid, remoteDON.Name, resultSuccess)
}
}

func (w *launcher) addRemoteCapability(ctx context.Context, cid string, c registrysyncer.CapabilityConfiguration, myDON registrysyncer.DON, remoteDON registrysyncer.DON, localRegistry *registrysyncer.LocalRegistry) error {
func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabilityConfig capabilities.CapabilityConfiguration, myDON registrysyncer.DON, remoteDON registrysyncer.DON, localRegistry *registrysyncer.LocalRegistry) error {
capability, ok := localRegistry.IDsToCapabilities[cid]
if !ok {
return fmt.Errorf("could not find capability matching id %s", cid)
}

capabilityConfig, err := c.Unmarshal()
if err != nil {
return fmt.Errorf("could not unmarshal capability config for id %s: %w", cid, err)
}

methodConfig := capabilityConfig.CapabilityMethodConfig
if methodConfig != nil { // v2 capability - handle via CombinedClient
errAdd := w.addRemoteCapabilityV2(ctx, capability.ID, methodConfig, myDON, remoteDON)
Expand Down Expand Up @@ -647,28 +662,36 @@ func (w *launcher) serveCapabilities(ctx context.Context, myPeerID p2ptypes.Peer
}

for cid, c := range don.CapabilityConfigurations {
err := w.serveCapability(ctx, cid, c, myPeerID, don, localRegistry, idsToDONs)
capabilityConfig, err := c.Unmarshal()
if err != nil {
w.lggr.Errorw("could not unmarshal capability config", "localDON", don, "capabilityID", cid, "error", err)
w.metrics.recordLocalCapabilityExposed(ctx, cid, resultFailure)
continue
}
if capabilityConfig.LocalOnly {
w.lggr.Debugw("skipping local-only capability", "localDON", don, "capabilityID", cid)
w.metrics.recordLocalCapabilityExposed(ctx, cid, resultSkipped)
continue
}
err = w.serveCapability(ctx, cid, capabilityConfig, myPeerID, don, localRegistry, idsToDONs)
if err != nil {
// TODO CRE-1021 metrics for failures
w.lggr.Errorw("failed to serve capability", "myPeerID", myPeerID, "don", don, "capabilityID", cid, "err", err)
w.lggr.Errorw("failed to serve capability", "myPeerID", myPeerID, "localDON", don, "capabilityID", cid, "err", err)
w.metrics.recordLocalCapabilityExposed(ctx, cid, resultFailure)
continue
}
w.metrics.recordLocalCapabilityExposed(ctx, cid, resultSuccess)
}
}

// serveCapability exposes a single capability.
// trigger capabilities are exposed via a TriggerPublisher local execution
// all other capabilities are exposed via an Executable for remote execution
func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysyncer.CapabilityConfiguration, myPeerID p2ptypes.PeerID, don registrysyncer.DON, localRegistry *registrysyncer.LocalRegistry, idsToDONs map[uint32]capabilities.DON) error {
func (w *launcher) serveCapability(ctx context.Context, cid string, capabilityConfig capabilities.CapabilityConfiguration, myPeerID p2ptypes.PeerID, don registrysyncer.DON, localRegistry *registrysyncer.LocalRegistry, idsToDONs map[uint32]capabilities.DON) error {
capability, ok := localRegistry.IDsToCapabilities[cid]
if !ok {
return fmt.Errorf("could not find capability matching id %s", cid)
}

capabilityConfig, err := c.Unmarshal()
if err != nil {
return fmt.Errorf("could not unmarshal capability config for id %s: %w", cid, err)
}

methodConfig := capabilityConfig.CapabilityMethodConfig
if methodConfig != nil { // v2 capability
errExpose := w.exposeCapabilityV2(ctx, cid, methodConfig, myPeerID, don, idsToDONs)
Expand Down Expand Up @@ -702,7 +725,7 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy
return publisher, nil
}

if err = w.addReceiver(ctx, capability, don, newTriggerPublisher); err != nil {
if err := w.addReceiver(ctx, capability, don, newTriggerPublisher); err != nil {
return fmt.Errorf("failed to add server-side receiver for a trigger capability '%s' - it won't be exposed remotely: %w", cid, err)
}
case capabilities.CapabilityTypeAction:
Expand Down Expand Up @@ -747,7 +770,7 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy
return server, nil
}

if err = w.addReceiver(ctx, capability, don, newActionServer); err != nil {
if err := w.addReceiver(ctx, capability, don, newActionServer); err != nil {
return fmt.Errorf("failed to add action server-side receiver '%s' - it won't be exposed remotely: %w", cid, err)
}
case capabilities.CapabilityTypeConsensus:
Expand Down Expand Up @@ -795,7 +818,7 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy
return server, nil
}

if err = w.addReceiver(ctx, capability, don, newTargetServer); err != nil {
if err := w.addReceiver(ctx, capability, don, newTargetServer); err != nil {
return fmt.Errorf("failed to add server-side receiver for a target capability '%s' - it won't be exposed remotely: %w", cid, err)
}
default:
Expand Down
Loading
Loading