Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili
return fmt.Errorf("failed to add target shim: %w", err)
}
default:
w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability)
w.lggr.Warnw("unknown capability type, skipping configuration", "capability", capability)
}

return nil
Expand Down Expand Up @@ -864,7 +864,7 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, capabilityCo
return fmt.Errorf("failed to add server-side receiver for a target capability '%s' - it won't be exposed remotely: %w", cid, err)
}
default:
w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability)
w.lggr.Warnw("unknown capability type, skipping configuration", "capability", capability)
}
return nil
}
Expand Down Expand Up @@ -895,7 +895,7 @@ func (w *launcher) addReceiver(ctx context.Context, capability registrysyncer.Ca
if errors.Is(err, remote.ErrReceiverExists) {
// If a receiver already exists, let's log the error for debug purposes, but
// otherwise short-circuit here. We've handled this capability in a previous iteration.
w.lggr.Debugf("receiver already exists for cap ID %s and don ID %d: %s", capID, don.ID, err)
w.lggr.Debugw("receiver already exists for capability", "capabilityID", capID, "donID", don.ID, "error", err)
return nil
} else if err != nil {
return fmt.Errorf("failed to set receiver: %w", err)
Expand Down
10 changes: 5 additions & 5 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type dispatcherMetrics struct {
sharedPeerMsgsRcvdCounter metric.Int64Counter
rateLimitedMsgsCounter metric.Int64Counter
invalidMsgsCounter metric.Int64Counter
unregisteredCapMsgsCounter metric.Int64Counter
unknownCapMsgsCounter metric.Int64Counter
receiveChannelUsageGauge metric.Float64Gauge
receiverDroppedMsgsCounter metric.Int64Counter
}
Expand Down Expand Up @@ -107,9 +107,9 @@ func (d *dispatcher) initMetrics() error {
if err != nil {
return fmt.Errorf("failed to register platform_don2don_dispatcher_invalid_msgs_total: %w", err)
}
d.metrics.unregisteredCapMsgsCounter, err = beholder.GetMeter().Int64Counter("platform_don2don_dispatcher_unregistered_capability_msgs_total")
d.metrics.unknownCapMsgsCounter, err = beholder.GetMeter().Int64Counter("platform_don2don_dispatcher_unknown_capability_msgs_total")
if err != nil {
return fmt.Errorf("failed to register platform_don2don_dispatcher_unregistered_capability_msgs_total: %w", err)
return fmt.Errorf("failed to register platform_don2don_dispatcher_unknown_capability_msgs_total: %w", err)
}
d.metrics.receiveChannelUsageGauge, err = beholder.GetMeter().Float64Gauge("platform_don2don_dispatcher_receive_channel_usage")
if err != nil {
Expand Down Expand Up @@ -310,12 +310,12 @@ func (d *dispatcher) handleMessage(ctx context.Context, msg *p2ptypes.Message) {
receiver, ok := d.receivers[k]
d.mu.RUnlock()
if !ok {
d.metrics.unregisteredCapMsgsCounter.Add(ctx, 1, metric.WithAttributes(
d.metrics.unknownCapMsgsCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("capabilityId", k.capID),
attribute.String("donId", strconv.FormatUint(uint64(k.donID), 10)),
attribute.String("method", k.methodName),
))
d.lggr.Debugw("received message for unregistered capability or method", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID, "method", k.methodName)
d.lggr.Debugw("received message for unknown capability or method", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID, "method", k.methodName)
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewLocalExecutableCapability(lggr logger.Logger, capabilityID string, local

func (l *LocalExecutableCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
if l.localNode.PeerID == nil || l.localNode.WorkflowDON.ID == 0 {
l.lggr.Debugf("empty DON info, executing immediately")
l.lggr.Debug("empty DON info, executing immediately")
return l.ExecutableCapability.Execute(ctx, req)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewLocalTargetCapability(lggr logger.Logger, capabilityID string, localDON

func (l *LocalTargetCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
if l.localNode.PeerID == nil || l.localNode.WorkflowDON.ID == 0 {
l.lggr.Debugf("empty DON info, executing immediately")
l.lggr.Debug("empty DON info, executing immediately")
return l.TargetCapability.Execute(ctx, req)
}

Expand Down
34 changes: 17 additions & 17 deletions core/capabilities/vault/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *Capability) Start(ctx context.Context) error {
closeHandler := func() {
ierr := s.handler.Close()
if ierr != nil {
s.lggr.Errorf("error closing vault DON request handler after failed registration: %v", ierr)
s.lggr.Errorw("error closing vault DON request handler after failed registration", "err", ierr)
}
}

Expand Down Expand Up @@ -168,50 +168,50 @@ func (s *Capability) Execute(ctx context.Context, request capabilities.Capabilit
}

func (s *Capability) CreateSecrets(ctx context.Context, request *vaultcommon.CreateSecretsRequest) (*vaulttypes.Response, error) {
s.lggr.Debugf("Received Request: %s", request.String())
s.lggr.Debugw("received create secrets request", "request", request.String())
resolvedIdentity, err := s.resolveRequestIdentity(ctx, request.OrgId, request.WorkflowOwner)
if err != nil {
return nil, err
}
request.OrgId = resolvedIdentity.OrgID
request.WorkflowOwner = resolvedIdentity.WorkflowOwner
if ownerErr := validateEncryptedSecretOwnersMatchResolvedIdentity(request.EncryptedSecrets, resolvedIdentity); ownerErr != nil {
s.lggr.Debugf("RequestId: [%s] failed identity owner checks: %s", request.RequestId, ownerErr.Error())
s.lggr.Debugw("failed identity owner checks", "requestID", request.RequestId, "err", ownerErr)
return nil, ownerErr
}
err = s.ValidateCreateSecretsRequest(ctx, s.publicKey.Get(), request, false)
if err != nil {
s.lggr.Debugf("RequestId: [%s] failed validation checks: %s", request.RequestId, err.Error())
s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "err", err)
return nil, err
}
return s.handleRequest(ctx, request.RequestId, request)
}

func (s *Capability) UpdateSecrets(ctx context.Context, request *vaultcommon.UpdateSecretsRequest) (*vaulttypes.Response, error) {
s.lggr.Debugf("Received Request: %s", request.String())
s.lggr.Debugw("received update secrets request", "request", request.String())
resolvedIdentity, err := s.resolveRequestIdentity(ctx, request.OrgId, request.WorkflowOwner)
if err != nil {
return nil, err
}
request.OrgId = resolvedIdentity.OrgID
request.WorkflowOwner = resolvedIdentity.WorkflowOwner
if ownerErr := validateEncryptedSecretOwnersMatchResolvedIdentity(request.EncryptedSecrets, resolvedIdentity); ownerErr != nil {
s.lggr.Debugf("RequestId: [%s] failed identity owner checks: %s", request.RequestId, ownerErr.Error())
s.lggr.Debugw("failed identity owner checks", "requestID", request.RequestId, "err", ownerErr)
return nil, ownerErr
}
err = s.ValidateUpdateSecretsRequest(ctx, s.publicKey.Get(), request, false)
if err != nil {
s.lggr.Debugf("RequestId: [%s] failed validation checks: %s", request.RequestId, err.Error())
s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "err", err)
return nil, err
}
return s.handleRequest(ctx, request.RequestId, request)
}

func (s *Capability) DeleteSecrets(ctx context.Context, request *vaultcommon.DeleteSecretsRequest) (*vaulttypes.Response, error) {
s.lggr.Debugf("Received Request: %s", request.String())
s.lggr.Debugw("received delete secrets request", "request", request.String())
err := s.ValidateDeleteSecretsRequest(request)
if err != nil {
s.lggr.Debugf("Request: [%s] failed validation checks: %s", request.String(), err.Error())
s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "request", request.String(), "err", err)
return nil, err
}
resolvedIdentity, err := s.resolveRequestIdentity(ctx, request.OrgId, request.WorkflowOwner)
Expand All @@ -221,16 +221,16 @@ func (s *Capability) DeleteSecrets(ctx context.Context, request *vaultcommon.Del
request.OrgId = resolvedIdentity.OrgID
request.WorkflowOwner = resolvedIdentity.WorkflowOwner
if err := validateSecretIdentifierOwnersMatchResolvedIdentity(request.Ids, resolvedIdentity); err != nil {
s.lggr.Debugf("Request: [%s] failed identity owner checks: %s", request.String(), err.Error())
s.lggr.Debugw("failed identity owner checks", "requestID", request.RequestId, "request", request.String(), "err", err)
return nil, err
}
return s.handleRequest(ctx, request.RequestId, request)
}

func (s *Capability) GetSecrets(ctx context.Context, requestID string, request *vaultcommon.GetSecretsRequest) (*vaulttypes.Response, error) {
s.lggr.Debugf("Received Request: %s", request.String())
s.lggr.Debugw("received get secrets request", "request", request.String())
if err := s.ValidateGetSecretsRequest(request); err != nil {
s.lggr.Debugf("Request: [%s] failed validation checks: %s", request.String(), err.Error())
s.lggr.Debugw("failed validation checks", "requestID", requestID, "request", request.String(), "err", err)
return nil, err
}

Expand All @@ -239,10 +239,10 @@ func (s *Capability) GetSecrets(ctx context.Context, requestID string, request *
}

func (s *Capability) ListSecretIdentifiers(ctx context.Context, request *vaultcommon.ListSecretIdentifiersRequest) (*vaulttypes.Response, error) {
s.lggr.Debugf("Received Request: %s", request.String())
s.lggr.Debugw("received list secret identifiers request", "request", request.String())
err := s.ValidateListSecretIdentifiersRequest(request)
if err != nil {
s.lggr.Debugf("Request: [%s] failed validation checks: %s", request.String(), err.Error())
s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "request", request.String(), "err", err)
return nil, err
}
resolvedIdentity, err := s.resolveRequestIdentity(ctx, request.OrgId, request.WorkflowOwner)
Expand All @@ -252,15 +252,15 @@ func (s *Capability) ListSecretIdentifiers(ctx context.Context, request *vaultco
request.OrgId = resolvedIdentity.OrgID
request.WorkflowOwner = resolvedIdentity.WorkflowOwner
if err := validateOwnerMatchesResolvedIdentity("owner", request.Owner, resolvedIdentity); err != nil {
s.lggr.Debugf("Request: [%s] failed identity owner checks: %s", request.String(), err.Error())
s.lggr.Debugw("failed identity owner checks", "requestID", request.RequestId, "request", request.String(), "err", err)
return nil, err
}
return s.handleRequest(ctx, request.RequestId, request)
}

func (s *Capability) GetPublicKey(ctx context.Context, request *vaultcommon.GetPublicKeyRequest) (*vaultcommon.GetPublicKeyResponse, error) {
l := logger.With(s.lggr, "method", "GetPublicKey")
l.Debugf("Received Request: GetPublicKeyRequest")
l.Debug("received get public key request")

pubKey := s.publicKey.Get()
if pubKey == nil {
Expand All @@ -270,7 +270,7 @@ func (s *Capability) GetPublicKey(ctx context.Context, request *vaultcommon.GetP

pkb, err := pubKey.Marshal()
if err != nil {
l.Debugf("could not marshal public key: %s", err.Error())
l.Debugw("could not marshal public key", "err", err)
return nil, fmt.Errorf("could not marshal public key: %w", err)
}

Expand Down
12 changes: 6 additions & 6 deletions core/capabilities/vault/gw_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (h *GatewayHandler) HandleGatewayMessage(ctx context.Context, gatewayID str
}

if err = h.gatewayConnector.SendToGateway(ctx, gatewayID, response); err != nil {
h.lggr.Errorf("Failed to send message to gateway %s: %v", gatewayID, err)
h.lggr.Errorw("Failed to send message to gateway", "gatewayID", gatewayID, "error", err)
return err
}

Expand Down Expand Up @@ -334,7 +334,7 @@ func (h *GatewayHandler) handleSecretsCreate(ctx context.Context, gatewayID stri
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
}

h.lggr.Debugf("Processing authorized and normalized create secrets request [%s]", vaultCapRequest.String())
h.lggr.Debugw("Processing authorized and normalized create secrets request", "request", vaultCapRequest.String())
vaultCapResponse, err := h.secretsService.CreateSecrets(ctx, &vaultCapRequest)
if err != nil {
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
Expand All @@ -357,7 +357,7 @@ func (h *GatewayHandler) handleSecretsUpdate(ctx context.Context, gatewayID stri
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
}

h.lggr.Debugf("Processing authorized and normalized update secrets request [%s]", vaultCapRequest.String())
h.lggr.Debugw("Processing authorized and normalized update secrets request", "request", vaultCapRequest.String())
vaultCapResponse, err := h.secretsService.UpdateSecrets(ctx, &vaultCapRequest)
if err != nil {
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
Expand All @@ -380,7 +380,7 @@ func (h *GatewayHandler) handleSecretsDelete(ctx context.Context, gatewayID stri
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
}

h.lggr.Debugf("Processing authorized and normalized delete secrets request [%s]", r.String())
h.lggr.Debugw("Processing authorized and normalized delete secrets request", "request", r.String())
resp, err := h.secretsService.DeleteSecrets(ctx, r)
if err != nil {
return h.errorResponse(ctx, gatewayID, req, api.HandlerError, fmt.Errorf("failed to delete secrets: %w", err))
Expand Down Expand Up @@ -410,7 +410,7 @@ func (h *GatewayHandler) handleSecretsList(ctx context.Context, gatewayID string
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
}

h.lggr.Debugf("Processing authorized and normalized list secrets request [%s]", r.String())
h.lggr.Debugw("Processing authorized and normalized list secrets request", "request", r.String())
resp, err := h.secretsService.ListSecretIdentifiers(ctx, r)
if err != nil {
return h.errorResponse(ctx, gatewayID, req, api.HandlerError, fmt.Errorf("failed to list secret identifiers: %w", err))
Expand Down Expand Up @@ -460,7 +460,7 @@ func (h *GatewayHandler) errorResponse(
errorCode api.ErrorCode,
err error,
) *jsonrpc.Response[json.RawMessage] {
h.lggr.Errorf("error code: %d, err: %s", errorCode, err.Error())
h.lggr.Errorw("gateway handler error response", "gatewayID", gatewayID, "requestID", req.ID, "method", req.Method, "errorCode", errorCode, "error", err)
h.metrics.requestInternalError.Add(ctx, 1, metric.WithAttributes(
attribute.String("gateway_id", gatewayID),
attribute.String("error", errorCode.String()),
Expand Down
35 changes: 32 additions & 3 deletions core/services/registrysyncer/local_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math"
"math/big"
"sync"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/libocr/ragep2p/types"
Expand Down Expand Up @@ -199,6 +200,10 @@ type LocalRegistry struct {
IDsToDONs map[DonID]DON
IDsToNodes map[types.PeerID]NodeInfo
IDsToCapabilities map[string]Capability

cacheMu sync.RWMutex
cachedLocalNodePeer types.PeerID
cachedLocalNode capabilities.Node
}

func NewLocalRegistry(
Expand Down Expand Up @@ -226,7 +231,31 @@ func (l *LocalRegistry) LocalNode(ctx context.Context) (capabilities.Node, error
return capabilities.Node{}, errors.New("unable to get local node: peerWrapper hasn't started yet")
}

return l.NodeByPeerID(ctx, pid)
l.cacheMu.RLock()
if l.cachedLocalNodePeer != (types.PeerID{}) && l.cachedLocalNodePeer == pid {
node := l.cachedLocalNode
l.cacheMu.RUnlock()
return node, nil
}
l.cacheMu.RUnlock()

l.cacheMu.Lock()
defer l.cacheMu.Unlock()
if l.cachedLocalNodePeer != (types.PeerID{}) && l.cachedLocalNodePeer == pid {
return l.cachedLocalNode, nil
}

// cache miss
if l.cachedLocalNodePeer != (types.PeerID{}) {
l.Logger.Errorw("node's peerID changed at runtime, this should never happen", "cachedLocalNodePeer", l.cachedLocalNodePeer, "currentPeerID", pid)
}
n, err := l.NodeByPeerID(ctx, pid)
if err != nil {
return n, err
}
l.cachedLocalNode = n
l.cachedLocalNodePeer = pid
return n, nil
}

func (l *LocalRegistry) NodeByPeerID(ctx context.Context, peerID types.PeerID) (capabilities.Node, error) {
Expand All @@ -249,9 +278,9 @@ func (l *LocalRegistry) NodeByPeerID(ctx context.Context, peerID types.PeerID) (
// greater than 0, so if the ID is 0, it means we've not set `workflowDON` initialized above yet.
if workflowDON.ID == 0 {
workflowDON = d.DON
l.Logger.Debug("Workflow DON identified: %+v", workflowDON)
l.Logger.Debugw("Workflow DON identified", "workflowDONID", workflowDON.ID)
} else {
l.Logger.Errorf("Configuration error: node %s belongs to more than one workflowDON", peerID)
l.Logger.Errorw("Configuration error: node belongs to more than one workflowDON", "peerID", peerID)
}
}

Expand Down
52 changes: 52 additions & 0 deletions core/services/registrysyncer/local_registry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package registrysyncer

import (
"context"
"testing"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
Expand All @@ -15,6 +16,57 @@ import (
valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb"
)

func TestLocalRegistry_LocalNode(t *testing.T) {
lggr := logger.Test(t)
localPeer := types.PeerID{0: 7}
getPeerID := func() (types.PeerID, error) {
return localPeer, nil
}
idsToDons := map[DonID]DON{
1: {
DON: capabilities.DON{
ID: 1,
F: 1,
Members: []types.PeerID{localPeer},
AcceptsWorkflows: true,
},
CapabilityConfigurations: map[string]CapabilityConfiguration{
"capabilityID@1.0.0": {},
},
},
}
idsToNodes := map[types.PeerID]NodeInfo{
localPeer: {NodeOperatorID: 42},
}
idsToCapabilities := map[string]Capability{
"capabilityID@1.0.0": {
ID: "capabilityID@1.0.0",
CapabilityType: capabilities.CapabilityTypeAction,
},
}
lr := NewLocalRegistry(lggr, getPeerID, idsToDons, idsToNodes, idsToCapabilities)

ctx := t.Context()
want, err := lr.NodeByPeerID(ctx, localPeer)
require.NoError(t, err)

got, err := lr.LocalNode(ctx)
require.NoError(t, err)
assert.Equal(t, want, got)

gotAgain, err := lr.LocalNode(ctx)
require.NoError(t, err)
assert.Equal(t, want, gotAgain)

t.Run("GetPeerID error", func(t *testing.T) {
broken := NewLocalRegistry(lggr, func() (types.PeerID, error) {
return types.PeerID{}, assert.AnError
}, idsToDons, idsToNodes, idsToCapabilities)
_, err := broken.LocalNode(context.Background())
require.ErrorContains(t, err, "unable to get local node: peerWrapper hasn't started yet")
})
}

func TestLocalRegistry_DONsForCapability(t *testing.T) {
lggr := logger.Test(t)
getPeerID := func() (types.PeerID, error) {
Expand Down
Loading
Loading