Skip to content

Commit 0d68c11

Browse files
committed
[CRE] Logging improvements
1 parent 2d2d90a commit 0d68c11

11 files changed

Lines changed: 121 additions & 40 deletions

File tree

core/capabilities/launcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ func (w *launcher) addRemoteCapability(ctx context.Context, cid string, capabili
629629
return fmt.Errorf("failed to add target shim: %w", err)
630630
}
631631
default:
632-
w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability)
632+
w.lggr.Warnw("unknown capability type, skipping configuration", "capability", capability)
633633
}
634634

635635
return nil
@@ -864,7 +864,7 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, capabilityCo
864864
return fmt.Errorf("failed to add server-side receiver for a target capability '%s' - it won't be exposed remotely: %w", cid, err)
865865
}
866866
default:
867-
w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability)
867+
w.lggr.Warnw("unknown capability type, skipping configuration", "capability", capability)
868868
}
869869
return nil
870870
}
@@ -895,7 +895,7 @@ func (w *launcher) addReceiver(ctx context.Context, capability registrysyncer.Ca
895895
if errors.Is(err, remote.ErrReceiverExists) {
896896
// If a receiver already exists, let's log the error for debug purposes, but
897897
// otherwise short-circuit here. We've handled this capability in a previous iteration.
898-
w.lggr.Debugf("receiver already exists for cap ID %s and don ID %d: %s", capID, don.ID, err)
898+
w.lggr.Debugw("receiver already exists for capability", "capabilityID", capID, "donID", don.ID, "error", err)
899899
return nil
900900
} else if err != nil {
901901
return fmt.Errorf("failed to set receiver: %w", err)

core/capabilities/remote/dispatcher.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type dispatcherMetrics struct {
5151
sharedPeerMsgsRcvdCounter metric.Int64Counter
5252
rateLimitedMsgsCounter metric.Int64Counter
5353
invalidMsgsCounter metric.Int64Counter
54-
unregisteredCapMsgsCounter metric.Int64Counter
54+
unknownCapMsgsCounter metric.Int64Counter
5555
receiveChannelUsageGauge metric.Float64Gauge
5656
receiverDroppedMsgsCounter metric.Int64Counter
5757
}
@@ -107,9 +107,9 @@ func (d *dispatcher) initMetrics() error {
107107
if err != nil {
108108
return fmt.Errorf("failed to register platform_don2don_dispatcher_invalid_msgs_total: %w", err)
109109
}
110-
d.metrics.unregisteredCapMsgsCounter, err = beholder.GetMeter().Int64Counter("platform_don2don_dispatcher_unregistered_capability_msgs_total")
110+
d.metrics.unknownCapMsgsCounter, err = beholder.GetMeter().Int64Counter("platform_don2don_dispatcher_unknown_capability_msgs_total")
111111
if err != nil {
112-
return fmt.Errorf("failed to register platform_don2don_dispatcher_unregistered_capability_msgs_total: %w", err)
112+
return fmt.Errorf("failed to register platform_don2don_dispatcher_unknown_capability_msgs_total: %w", err)
113113
}
114114
d.metrics.receiveChannelUsageGauge, err = beholder.GetMeter().Float64Gauge("platform_don2don_dispatcher_receive_channel_usage")
115115
if err != nil {
@@ -310,12 +310,12 @@ func (d *dispatcher) handleMessage(ctx context.Context, msg *p2ptypes.Message) {
310310
receiver, ok := d.receivers[k]
311311
d.mu.RUnlock()
312312
if !ok {
313-
d.metrics.unregisteredCapMsgsCounter.Add(ctx, 1, metric.WithAttributes(
313+
d.metrics.unknownCapMsgsCounter.Add(ctx, 1, metric.WithAttributes(
314314
attribute.String("capabilityId", k.capID),
315315
attribute.String("donId", strconv.FormatUint(uint64(k.donID), 10)),
316316
attribute.String("method", k.methodName),
317317
))
318-
d.lggr.Debugw("received message for unregistered capability or method", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID, "method", k.methodName)
318+
d.lggr.Debugw("received message for unknown capability or method", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID, "method", k.methodName)
319319
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
320320
return
321321
}

core/capabilities/transmission/local_executable_capability.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func NewLocalExecutableCapability(lggr logger.Logger, capabilityID string, local
2727

2828
func (l *LocalExecutableCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
2929
if l.localNode.PeerID == nil || l.localNode.WorkflowDON.ID == 0 {
30-
l.lggr.Debugf("empty DON info, executing immediately")
30+
l.lggr.Debug("empty DON info, executing immediately")
3131
return l.ExecutableCapability.Execute(ctx, req)
3232
}
3333

core/capabilities/transmission/local_target_capability.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func NewLocalTargetCapability(lggr logger.Logger, capabilityID string, localDON
2929

3030
func (l *LocalTargetCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
3131
if l.localNode.PeerID == nil || l.localNode.WorkflowDON.ID == 0 {
32-
l.lggr.Debugf("empty DON info, executing immediately")
32+
l.lggr.Debug("empty DON info, executing immediately")
3333
return l.TargetCapability.Execute(ctx, req)
3434
}
3535

core/capabilities/vault/capability.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (s *Capability) Start(ctx context.Context) error {
4444
closeHandler := func() {
4545
ierr := s.handler.Close()
4646
if ierr != nil {
47-
s.lggr.Errorf("error closing vault DON request handler after failed registration: %v", ierr)
47+
s.lggr.Errorw("error closing vault DON request handler after failed registration", "err", ierr)
4848
}
4949
}
5050

@@ -168,50 +168,50 @@ func (s *Capability) Execute(ctx context.Context, request capabilities.Capabilit
168168
}
169169

170170
func (s *Capability) CreateSecrets(ctx context.Context, request *vaultcommon.CreateSecretsRequest) (*vaulttypes.Response, error) {
171-
s.lggr.Debugf("Received Request: %s", request.String())
171+
s.lggr.Debugw("received create secrets request", "request", request.String())
172172
resolvedIdentity, err := s.resolveRequestIdentity(ctx, request.OrgId, request.WorkflowOwner)
173173
if err != nil {
174174
return nil, err
175175
}
176176
request.OrgId = resolvedIdentity.OrgID
177177
request.WorkflowOwner = resolvedIdentity.WorkflowOwner
178178
if ownerErr := validateEncryptedSecretOwnersMatchResolvedIdentity(request.EncryptedSecrets, resolvedIdentity); ownerErr != nil {
179-
s.lggr.Debugf("RequestId: [%s] failed identity owner checks: %s", request.RequestId, ownerErr.Error())
179+
s.lggr.Debugw("failed identity owner checks", "requestID", request.RequestId, "err", ownerErr)
180180
return nil, ownerErr
181181
}
182182
err = s.ValidateCreateSecretsRequest(ctx, s.publicKey.Get(), request, false)
183183
if err != nil {
184-
s.lggr.Debugf("RequestId: [%s] failed validation checks: %s", request.RequestId, err.Error())
184+
s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "err", err)
185185
return nil, err
186186
}
187187
return s.handleRequest(ctx, request.RequestId, request)
188188
}
189189

190190
func (s *Capability) UpdateSecrets(ctx context.Context, request *vaultcommon.UpdateSecretsRequest) (*vaulttypes.Response, error) {
191-
s.lggr.Debugf("Received Request: %s", request.String())
191+
s.lggr.Debugw("received update secrets request", "request", request.String())
192192
resolvedIdentity, err := s.resolveRequestIdentity(ctx, request.OrgId, request.WorkflowOwner)
193193
if err != nil {
194194
return nil, err
195195
}
196196
request.OrgId = resolvedIdentity.OrgID
197197
request.WorkflowOwner = resolvedIdentity.WorkflowOwner
198198
if ownerErr := validateEncryptedSecretOwnersMatchResolvedIdentity(request.EncryptedSecrets, resolvedIdentity); ownerErr != nil {
199-
s.lggr.Debugf("RequestId: [%s] failed identity owner checks: %s", request.RequestId, ownerErr.Error())
199+
s.lggr.Debugw("failed identity owner checks", "requestID", request.RequestId, "err", ownerErr)
200200
return nil, ownerErr
201201
}
202202
err = s.ValidateUpdateSecretsRequest(ctx, s.publicKey.Get(), request, false)
203203
if err != nil {
204-
s.lggr.Debugf("RequestId: [%s] failed validation checks: %s", request.RequestId, err.Error())
204+
s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "err", err)
205205
return nil, err
206206
}
207207
return s.handleRequest(ctx, request.RequestId, request)
208208
}
209209

210210
func (s *Capability) DeleteSecrets(ctx context.Context, request *vaultcommon.DeleteSecretsRequest) (*vaulttypes.Response, error) {
211-
s.lggr.Debugf("Received Request: %s", request.String())
211+
s.lggr.Debugw("received delete secrets request", "request", request.String())
212212
err := s.ValidateDeleteSecretsRequest(request)
213213
if err != nil {
214-
s.lggr.Debugf("Request: [%s] failed validation checks: %s", request.String(), err.Error())
214+
s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "request", request.String(), "err", err)
215215
return nil, err
216216
}
217217
resolvedIdentity, err := s.resolveRequestIdentity(ctx, request.OrgId, request.WorkflowOwner)
@@ -221,16 +221,16 @@ func (s *Capability) DeleteSecrets(ctx context.Context, request *vaultcommon.Del
221221
request.OrgId = resolvedIdentity.OrgID
222222
request.WorkflowOwner = resolvedIdentity.WorkflowOwner
223223
if err := validateSecretIdentifierOwnersMatchResolvedIdentity(request.Ids, resolvedIdentity); err != nil {
224-
s.lggr.Debugf("Request: [%s] failed identity owner checks: %s", request.String(), err.Error())
224+
s.lggr.Debugw("failed identity owner checks", "requestID", request.RequestId, "request", request.String(), "err", err)
225225
return nil, err
226226
}
227227
return s.handleRequest(ctx, request.RequestId, request)
228228
}
229229

230230
func (s *Capability) GetSecrets(ctx context.Context, requestID string, request *vaultcommon.GetSecretsRequest) (*vaulttypes.Response, error) {
231-
s.lggr.Debugf("Received Request: %s", request.String())
231+
s.lggr.Debugw("received get secrets request", "request", request.String())
232232
if err := s.ValidateGetSecretsRequest(request); err != nil {
233-
s.lggr.Debugf("Request: [%s] failed validation checks: %s", request.String(), err.Error())
233+
s.lggr.Debugw("failed validation checks", "requestID", requestID, "request", request.String(), "err", err)
234234
return nil, err
235235
}
236236

@@ -239,10 +239,10 @@ func (s *Capability) GetSecrets(ctx context.Context, requestID string, request *
239239
}
240240

241241
func (s *Capability) ListSecretIdentifiers(ctx context.Context, request *vaultcommon.ListSecretIdentifiersRequest) (*vaulttypes.Response, error) {
242-
s.lggr.Debugf("Received Request: %s", request.String())
242+
s.lggr.Debugw("received list secret identifiers request", "request", request.String())
243243
err := s.ValidateListSecretIdentifiersRequest(request)
244244
if err != nil {
245-
s.lggr.Debugf("Request: [%s] failed validation checks: %s", request.String(), err.Error())
245+
s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "request", request.String(), "err", err)
246246
return nil, err
247247
}
248248
resolvedIdentity, err := s.resolveRequestIdentity(ctx, request.OrgId, request.WorkflowOwner)
@@ -252,15 +252,15 @@ func (s *Capability) ListSecretIdentifiers(ctx context.Context, request *vaultco
252252
request.OrgId = resolvedIdentity.OrgID
253253
request.WorkflowOwner = resolvedIdentity.WorkflowOwner
254254
if err := validateOwnerMatchesResolvedIdentity("owner", request.Owner, resolvedIdentity); err != nil {
255-
s.lggr.Debugf("Request: [%s] failed identity owner checks: %s", request.String(), err.Error())
255+
s.lggr.Debugw("failed identity owner checks", "requestID", request.RequestId, "request", request.String(), "err", err)
256256
return nil, err
257257
}
258258
return s.handleRequest(ctx, request.RequestId, request)
259259
}
260260

261261
func (s *Capability) GetPublicKey(ctx context.Context, request *vaultcommon.GetPublicKeyRequest) (*vaultcommon.GetPublicKeyResponse, error) {
262262
l := logger.With(s.lggr, "method", "GetPublicKey")
263-
l.Debugf("Received Request: GetPublicKeyRequest")
263+
l.Debug("received get public key request")
264264

265265
pubKey := s.publicKey.Get()
266266
if pubKey == nil {
@@ -270,7 +270,7 @@ func (s *Capability) GetPublicKey(ctx context.Context, request *vaultcommon.GetP
270270

271271
pkb, err := pubKey.Marshal()
272272
if err != nil {
273-
l.Debugf("could not marshal public key: %s", err.Error())
273+
l.Debugw("could not marshal public key", "err", err)
274274
return nil, fmt.Errorf("could not marshal public key: %w", err)
275275
}
276276

core/capabilities/vault/gw_handler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func (h *GatewayHandler) HandleGatewayMessage(ctx context.Context, gatewayID str
194194
}
195195

196196
if err = h.gatewayConnector.SendToGateway(ctx, gatewayID, response); err != nil {
197-
h.lggr.Errorf("Failed to send message to gateway %s: %v", gatewayID, err)
197+
h.lggr.Errorw("Failed to send message to gateway", "gatewayID", gatewayID, "error", err)
198198
return err
199199
}
200200

@@ -334,7 +334,7 @@ func (h *GatewayHandler) handleSecretsCreate(ctx context.Context, gatewayID stri
334334
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
335335
}
336336

337-
h.lggr.Debugf("Processing authorized and normalized create secrets request [%s]", vaultCapRequest.String())
337+
h.lggr.Debugw("Processing authorized and normalized create secrets request", "request", vaultCapRequest.String())
338338
vaultCapResponse, err := h.secretsService.CreateSecrets(ctx, &vaultCapRequest)
339339
if err != nil {
340340
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
@@ -357,7 +357,7 @@ func (h *GatewayHandler) handleSecretsUpdate(ctx context.Context, gatewayID stri
357357
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
358358
}
359359

360-
h.lggr.Debugf("Processing authorized and normalized update secrets request [%s]", vaultCapRequest.String())
360+
h.lggr.Debugw("Processing authorized and normalized update secrets request", "request", vaultCapRequest.String())
361361
vaultCapResponse, err := h.secretsService.UpdateSecrets(ctx, &vaultCapRequest)
362362
if err != nil {
363363
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
@@ -380,7 +380,7 @@ func (h *GatewayHandler) handleSecretsDelete(ctx context.Context, gatewayID stri
380380
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
381381
}
382382

383-
h.lggr.Debugf("Processing authorized and normalized delete secrets request [%s]", r.String())
383+
h.lggr.Debugw("Processing authorized and normalized delete secrets request", "request", r.String())
384384
resp, err := h.secretsService.DeleteSecrets(ctx, r)
385385
if err != nil {
386386
return h.errorResponse(ctx, gatewayID, req, api.HandlerError, fmt.Errorf("failed to delete secrets: %w", err))
@@ -410,7 +410,7 @@ func (h *GatewayHandler) handleSecretsList(ctx context.Context, gatewayID string
410410
return h.errorResponse(ctx, gatewayID, req, api.FatalError, err)
411411
}
412412

413-
h.lggr.Debugf("Processing authorized and normalized list secrets request [%s]", r.String())
413+
h.lggr.Debugw("Processing authorized and normalized list secrets request", "request", r.String())
414414
resp, err := h.secretsService.ListSecretIdentifiers(ctx, r)
415415
if err != nil {
416416
return h.errorResponse(ctx, gatewayID, req, api.HandlerError, fmt.Errorf("failed to list secret identifiers: %w", err))
@@ -460,7 +460,7 @@ func (h *GatewayHandler) errorResponse(
460460
errorCode api.ErrorCode,
461461
err error,
462462
) *jsonrpc.Response[json.RawMessage] {
463-
h.lggr.Errorf("error code: %d, err: %s", errorCode, err.Error())
463+
h.lggr.Errorw("gateway handler error response", "gatewayID", gatewayID, "requestID", req.ID, "method", req.Method, "errorCode", errorCode, "error", err)
464464
h.metrics.requestInternalError.Add(ctx, 1, metric.WithAttributes(
465465
attribute.String("gateway_id", gatewayID),
466466
attribute.String("error", errorCode.String()),

core/services/registrysyncer/local_registry.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math"
99
"math/big"
10+
"sync"
1011

1112
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
1213
"github.com/smartcontractkit/libocr/ragep2p/types"
@@ -199,6 +200,10 @@ type LocalRegistry struct {
199200
IDsToDONs map[DonID]DON
200201
IDsToNodes map[types.PeerID]NodeInfo
201202
IDsToCapabilities map[string]Capability
203+
204+
cacheMu sync.RWMutex
205+
cachedLocalNodePeer types.PeerID
206+
cachedLocalNode capabilities.Node
202207
}
203208

204209
func NewLocalRegistry(
@@ -226,7 +231,31 @@ func (l *LocalRegistry) LocalNode(ctx context.Context) (capabilities.Node, error
226231
return capabilities.Node{}, errors.New("unable to get local node: peerWrapper hasn't started yet")
227232
}
228233

229-
return l.NodeByPeerID(ctx, pid)
234+
l.cacheMu.RLock()
235+
if l.cachedLocalNodePeer != (types.PeerID{}) && l.cachedLocalNodePeer == pid {
236+
node := l.cachedLocalNode
237+
l.cacheMu.RUnlock()
238+
return node, nil
239+
}
240+
l.cacheMu.RUnlock()
241+
242+
l.cacheMu.Lock()
243+
defer l.cacheMu.Unlock()
244+
if l.cachedLocalNodePeer != (types.PeerID{}) && l.cachedLocalNodePeer == pid {
245+
return l.cachedLocalNode, nil
246+
}
247+
248+
// cache miss
249+
if l.cachedLocalNodePeer != (types.PeerID{}) {
250+
l.Logger.Errorw("node's peerID changed at runtime, this should never happen", "cachedLocalNodePeer", l.cachedLocalNodePeer, "currentPeerID", pid)
251+
}
252+
n, err := l.NodeByPeerID(ctx, pid)
253+
if err != nil {
254+
return n, err
255+
}
256+
l.cachedLocalNode = n
257+
l.cachedLocalNodePeer = pid
258+
return n, nil
230259
}
231260

232261
func (l *LocalRegistry) NodeByPeerID(ctx context.Context, peerID types.PeerID) (capabilities.Node, error) {
@@ -249,9 +278,9 @@ func (l *LocalRegistry) NodeByPeerID(ctx context.Context, peerID types.PeerID) (
249278
// greater than 0, so if the ID is 0, it means we've not set `workflowDON` initialized above yet.
250279
if workflowDON.ID == 0 {
251280
workflowDON = d.DON
252-
l.Logger.Debug("Workflow DON identified: %+v", workflowDON)
281+
l.Logger.Debugw("Workflow DON identified", "workflowDONID", workflowDON.ID)
253282
} else {
254-
l.Logger.Errorf("Configuration error: node %s belongs to more than one workflowDON", peerID)
283+
l.Logger.Errorw("Configuration error: node belongs to more than one workflowDON", "peerID", peerID)
255284
}
256285
}
257286

core/services/registrysyncer/local_registry_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package registrysyncer
22

33
import (
4+
"context"
45
"testing"
56

67
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
@@ -15,6 +16,57 @@ import (
1516
valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb"
1617
)
1718

19+
func TestLocalRegistry_LocalNode(t *testing.T) {
20+
lggr := logger.Test(t)
21+
localPeer := types.PeerID{0: 7}
22+
getPeerID := func() (types.PeerID, error) {
23+
return localPeer, nil
24+
}
25+
idsToDons := map[DonID]DON{
26+
1: {
27+
DON: capabilities.DON{
28+
ID: 1,
29+
F: 1,
30+
Members: []types.PeerID{localPeer},
31+
AcceptsWorkflows: true,
32+
},
33+
CapabilityConfigurations: map[string]CapabilityConfiguration{
34+
"capabilityID@1.0.0": {},
35+
},
36+
},
37+
}
38+
idsToNodes := map[types.PeerID]NodeInfo{
39+
localPeer: {NodeOperatorID: 42},
40+
}
41+
idsToCapabilities := map[string]Capability{
42+
"capabilityID@1.0.0": {
43+
ID: "capabilityID@1.0.0",
44+
CapabilityType: capabilities.CapabilityTypeAction,
45+
},
46+
}
47+
lr := NewLocalRegistry(lggr, getPeerID, idsToDons, idsToNodes, idsToCapabilities)
48+
49+
ctx := t.Context()
50+
want, err := lr.NodeByPeerID(ctx, localPeer)
51+
require.NoError(t, err)
52+
53+
got, err := lr.LocalNode(ctx)
54+
require.NoError(t, err)
55+
assert.Equal(t, want, got)
56+
57+
gotAgain, err := lr.LocalNode(ctx)
58+
require.NoError(t, err)
59+
assert.Equal(t, want, gotAgain)
60+
61+
t.Run("GetPeerID error", func(t *testing.T) {
62+
broken := NewLocalRegistry(lggr, func() (types.PeerID, error) {
63+
return types.PeerID{}, assert.AnError
64+
}, idsToDons, idsToNodes, idsToCapabilities)
65+
_, err := broken.LocalNode(context.Background())
66+
require.ErrorContains(t, err, "unable to get local node: peerWrapper hasn't started yet")
67+
})
68+
}
69+
1870
func TestLocalRegistry_DONsForCapability(t *testing.T) {
1971
lggr := logger.Test(t)
2072
getPeerID := func() (types.PeerID, error) {

0 commit comments

Comments
 (0)