Skip to content

Commit fed89ad

Browse files
committed
fix loop chip config, clean up chip emitter impl
1 parent 8d3a3bd commit fed89ad

File tree

4 files changed

+38
-12
lines changed

4 files changed

+38
-12
lines changed

pkg/beholder/chip_ingress_emitter.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,53 @@ package beholder
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
ch "github.com/smartcontractkit/chainlink-common/pkg/chipingress"
89
chpb "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
910
)
1011

12+
const (
13+
defaultTimeout = 3 * time.Second
14+
)
15+
1116
type ChipIngressEmitter struct {
12-
client chpb.ChipIngressClient
17+
client chpb.ChipIngressClient
18+
timeout time.Duration
19+
}
20+
21+
type Opt func(*ChipIngressEmitter)
22+
23+
func WithTimeout(timeout time.Duration) Opt {
24+
return func(e *ChipIngressEmitter) {
25+
e.timeout = timeout
26+
}
1327
}
1428

15-
func NewChipIngressEmitter(client chpb.ChipIngressClient) (Emitter, error) {
29+
func NewChipIngressEmitter(client chpb.ChipIngressClient, opts ...Opt) (Emitter, error) {
1630

1731
if client == nil {
1832
return nil, fmt.Errorf("chip ingress client is nil")
1933
}
20-
21-
return &ChipIngressEmitter{client: client}, nil
34+
e := &ChipIngressEmitter{client: client, timeout: defaultTimeout}
35+
for _, opt := range opts {
36+
opt(e)
37+
}
38+
return e, nil
2239
}
2340

2441
func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
42+
// newCtx returns a ctx with a timeout min(c.timeout, ctx.Deadline())
43+
newCtx := func(ctx context.Context) (context.Context, context.CancelFunc) {
44+
// check if ctx has a deadline and it's less than the emitter timeout,
45+
// then we need a new ctx with the emitter timeout
46+
if dl, ok := ctx.Deadline(); ok && time.Until(dl) < c.timeout {
47+
return context.WithTimeout(context.Background(), c.timeout)
48+
}
49+
return context.WithTimeout(ctx, c.timeout)
50+
}
51+
ctx, cancel := newCtx(ctx)
52+
defer cancel()
2553

2654
sourceDomain, entityType, err := ExtractSourceAndType(attrKVs...)
2755
if err != nil {

pkg/beholder/client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"net"
88

9-
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
109
"go.opentelemetry.io/otel/attribute"
1110
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
1211
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
@@ -21,6 +20,8 @@ import (
2120
oteltrace "go.opentelemetry.io/otel/trace"
2221
"google.golang.org/grpc/credentials"
2322
"google.golang.org/grpc/credentials/insecure"
23+
24+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
2425
)
2526

2627
type Emitter interface {
@@ -249,7 +250,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
249250
return nil, err
250251
}
251252

252-
chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient)
253+
chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient, WithTimeout(cfg.EmitterExportTimeout))
253254
if err != nil {
254255
return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err)
255256
}

pkg/beholder/dual_source_emitter.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package beholder
33
import (
44
"context"
55
"fmt"
6-
"time"
76

87
"github.com/smartcontractkit/chainlink-common/pkg/logger"
98
)
@@ -49,11 +48,8 @@ func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...an
4948

5049
// Emit via chip ingress async
5150
go func() {
52-
d.log.Debugw("emitting to chip ingress", "body", string(body), "attributes", attrKVs)
53-
d.log.Debugw("overriding context for chip ingress emission", "timeout", 5*time.Second)
54-
ctx2, cancel := context.WithTimeout(context.Background(), 5*time.Second)
55-
defer cancel()
56-
if err := d.chipIngressEmitter.Emit(ctx2, body, attrKVs...); err != nil {
51+
52+
if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil {
5753
// If the chip ingress emitter fails, we ONLY log the error
5854
// because we still want to send the data to the OTLP collector and not cause disruption
5955
d.log.Errorw("failed to emit to chip ingress", "error", err)

pkg/loop/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (s *Server) start() error {
126126
EmitterMaxQueueSize: s.EnvConfig.TelemetryEmitterMaxQueueSize,
127127
ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "",
128128
ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint,
129+
ChipIngressInsecureConnection: s.EnvConfig.TelemetryInsecureConnection,
129130
}
130131

131132
if tracingConfig.Enabled {

0 commit comments

Comments
 (0)