Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Config struct {
LogRetryConfig *RetryConfig
LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter

// Heartbeat
HeartbeatEnabled bool // Enable periodic heartbeat emission

// Auth
AuthPublicKeyHex string
AuthHeaders map[string]string
Expand Down Expand Up @@ -115,6 +118,7 @@ func DefaultConfig() Config {
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
LogStreamingEnabled: true, // Enable logs streaming by default
HeartbeatEnabled: true, // Enable heartbeat by default
}
}

Expand Down
227 changes: 227 additions & 0 deletions pkg/beholder/heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package beholder

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
"github.com/smartcontractkit/chainlink-common/pkg/config/build"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// Heartbeat represents a periodic heartbeat service that emits metrics and logs
type Heartbeat struct {
services.Service
eng *services.Engine

Beat time.Duration
Emitter Emitter
Meter metric.Meter
Logger logger.Logger
Tracer trace.Tracer
AppID string
ServiceName string
Version string
Commit string
Labels map[string]string
}

// NewHeartbeat creates a new heartbeat service with custom configuration
func NewHeartbeat(beat time.Duration, lggr logger.Logger, opts ...HeartbeatOpt) *Heartbeat {
// Setup default emitter, meter, and tracer
noopClient := NewNoopClient()

// Create heartbeat with defaults
h := &Heartbeat{
Beat: beat,
Logger: lggr,
Emitter: noopClient.Emitter,
Meter: noopClient.Meter,
Tracer: noopClient.Tracer,
AppID: "chainlink", // Default app ID
ServiceName: build.Program, // Default service name
Version: build.Version, // Use build version
Commit: build.ChecksumPrefix, // Use build commit
Labels: make(map[string]string),
}

// Apply options
for _, opt := range opts {
opt(h)
}

// Build labels from current values
h.Labels = map[string]string{
"service": h.ServiceName,
"version": h.Version,
"commit": h.Commit,
}
if h.AppID != "" {
h.Labels["app_id"] = h.AppID
}

// Create service engine
h.Service, h.eng = services.Config{
Name: "BeholderHeartbeat",
Start: h.start,
}.NewServiceEngine(lggr)

return h
}

// HeartbeatOpt is a functional option for configuring the heartbeat
type HeartbeatOpt func(*Heartbeat)

// WithEmitter sets a custom message emitter for the heartbeat
func WithEmitter(emitter Emitter) HeartbeatOpt {
return func(h *Heartbeat) {
h.Emitter = emitter
}
}

// WithMeter sets a custom meter for the heartbeat
func WithMeter(meter metric.Meter) HeartbeatOpt {
return func(h *Heartbeat) {
h.Meter = meter
}
}

// WithTracer sets a custom tracer for the heartbeat
func WithTracer(tracer trace.Tracer) HeartbeatOpt {
return func(h *Heartbeat) {
h.Tracer = tracer
}
}
Comment on lines +84 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These will pollute the beholder pkg namespace. might be better to use a config struct like heartbeatConfig


// WithAppID sets a custom app ID for the heartbeat
func WithAppID(appID string) HeartbeatOpt {
return func(h *Heartbeat) {
h.AppID = appID
if appID != "" {
h.Labels["app_id"] = appID
} else {
delete(h.Labels, "app_id")
}
}
}

// WithServiceName sets a custom service name for the heartbeat
func WithServiceName(serviceName string) HeartbeatOpt {
return func(h *Heartbeat) {
h.ServiceName = serviceName
h.Labels["service"] = serviceName
}
}

// WithVersion sets a custom version for the heartbeat
func WithVersion(version string) HeartbeatOpt {
return func(h *Heartbeat) {
h.Version = version
h.Labels["version"] = version
}
}

// WithCommit sets a custom commit for the heartbeat
func WithCommit(commit string) HeartbeatOpt {
return func(h *Heartbeat) {
h.Commit = commit
h.Labels["commit"] = commit
}
}

// WithBeatInterval sets a custom beat interval for the heartbeat
func WithBeatInterval(beat time.Duration) HeartbeatOpt {
return func(h *Heartbeat) {
h.Beat = beat
}
}

// start initializes and starts the heartbeat service
func (h *Heartbeat) start(ctx context.Context) error {
// Create heartbeat metrics
heartbeatGauge, err := h.Meter.Int64Gauge("beholder_heartbeat")
if err != nil {
return fmt.Errorf("failed to create heartbeat status gauge: %w", err)
}

heartbeatCount, err := h.Meter.Int64Counter("beholder_heartbeat_count")
if err != nil {
return fmt.Errorf("failed to create heartbeat counter: %w", err)
}

// Define the heartbeat function
beatFn := func(ctx context.Context) {
start := time.Now()

// Create a trace span for the heartbeat
ctx, span := h.Tracer.Start(ctx, "beholder_heartbeat", trace.WithAttributes(
attribute.String("service", h.ServiceName),
attribute.String("app_id", h.AppID),
attribute.String("version", h.Version),
attribute.String("commit", h.Commit),
))
defer span.End()

// Record heartbeat metrics
heartbeatGauge.Record(ctx, 1)
heartbeatCount.Add(ctx, 1)

// Emit heartbeat message

payload := &pb.BaseMessage{
Msg: "beholder heartbeat",
Labels: h.Labels,
}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
// log error
h.Logger.Errorw("heartbeat marshal protobuf failed", "err", err)
}

err = h.Emitter.Emit(ctx, payloadBytes,
AttrKeyDataSchema, "/beholder-base-message/versions/1", // required
AttrKeyDomain, "platform", // required
AttrKeyEntity, "BaseMessage", // required
"service", h.ServiceName,
"app_id", h.AppID,
"version", h.Version,
"commit", h.Commit,
"timestamp", start.Unix(),
)

if err != nil {
h.Logger.Errorw("heartbeat emit failed", "err", err)
}

// Log heartbeat
h.Logger.Debugw("beholder heartbeat emitted",
"service", h.ServiceName,
"app_id", h.AppID,
"version", h.Version,
"commit", h.Commit,
"timestamp", start.Unix(),
)
}

// Start the heartbeat ticker
// Execute immediately first, then continue with regular intervals
h.eng.Go(func(ctx context.Context) {
beatFn(ctx)
})
h.eng.GoTick(timeutil.NewTicker(func() time.Duration { return h.Beat }), beatFn)

h.Logger.Infow("beholder heartbeat service started",
"service", h.ServiceName,
"beat_interval", h.Beat,
)

return nil
}
71 changes: 71 additions & 0 deletions pkg/beholder/heartbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package beholder_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

func TestHeartbeat_NewHeartbeat(t *testing.T) {
lggr, err := logger.New()
require.NoError(t, err)

heartbeat := beholder.NewHeartbeat(
1*time.Second,
lggr,
beholder.WithAppID("test-app"),
beholder.WithServiceName("test-service"),
beholder.WithVersion("1.0.0"),
beholder.WithCommit("abc123"),
)
require.NotNil(t, heartbeat)

assert.Equal(t, "test-app", heartbeat.AppID)
assert.Equal(t, "test-service", heartbeat.ServiceName)
assert.Equal(t, "1.0.0", heartbeat.Version)
assert.Equal(t, "abc123", heartbeat.Commit)
assert.Equal(t, 1*time.Second, heartbeat.Beat)
assert.NotNil(t, heartbeat.Emitter)
assert.NotNil(t, heartbeat.Meter)
}

func TestHeartbeat_Start(t *testing.T) {
lggr, err := logger.New()
require.NoError(t, err)

heartbeat := beholder.NewHeartbeat(100*time.Millisecond, lggr)
require.NotNil(t, heartbeat)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

err = heartbeat.Start(ctx)
require.NoError(t, err)

// Wait for at least one heartbeat
time.Sleep(150 * time.Millisecond)

err = heartbeat.Close()
require.NoError(t, err)
}

func TestHeartbeat_Defaults(t *testing.T) {
lggr, err := logger.New()
require.NoError(t, err)

heartbeat := beholder.NewHeartbeat(1*time.Second, lggr)
require.NotNil(t, heartbeat)

// Check defaults
assert.Equal(t, "chainlink", heartbeat.AppID)
assert.Equal(t, "github.com/smartcontractkit/chainlink-common", heartbeat.ServiceName)
assert.Equal(t, "(devel)", heartbeat.Version)
assert.Equal(t, "unset", heartbeat.Commit)
assert.Equal(t, 1*time.Second, heartbeat.Beat)
}
11 changes: 11 additions & 0 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ func NewWith(cfgFn func(*zap.Config)) (Logger, error) {
return &logger{core.Sugar()}, nil
}

// NewCore returns a new Logger core from a modified [zap.Config].
func NewCore(cfgFn func(*zap.Config)) (zapcore.Core, error) {
cfg := zap.NewProductionConfig()
cfgFn(&cfg)
logger, err := cfg.Build()
if err != nil {
return nil, err
}
return logger.Core(), nil
}

// NewWithSync returns a new Logger with a given SyncWriter.
func NewWithSync(w io.Writer) Logger {
core := zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), zapcore.AddSync(w), zapcore.InfoLevel)
Expand Down
22 changes: 22 additions & 0 deletions pkg/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,25 @@ type differentLogger interface {

Sync() error
}

func TestNewCore(t *testing.T) {
// First core at Info (would drop Debug), second core at Debug
obsCore, obsLogs := observer.New(zap.DebugLevel)

primaryCore, err := NewCore(func(cfg *zap.Config) {
cfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
})
if err != nil {
t.Fatalf("NewCore error: %v", err)
}

lggr := NewWithCores(primaryCore, obsCore)

lggr.Debug("debug message should reach observer core")
if got := obsLogs.Len(); got != 1 {
t.Fatalf("expected 1 log in observer core, got %d", got)
}
if msg := obsLogs.All()[0].Message; msg != "debug message should reach observer core" {
t.Fatalf("unexpected message: %s", msg)
}
}
7 changes: 7 additions & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE"
envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE"
envTelemetryLogStreamingEnabled = "CL_TELEMETRY_LOG_STREAMING_ENABLED"
envTelemetryHeartbeatEnabled = "CL_TELEMETRY_HEARTBEAT_ENABLED"

envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
Expand Down Expand Up @@ -118,6 +119,7 @@ type EnvConfig struct {
TelemetryEmitterExportMaxBatchSize int
TelemetryEmitterMaxQueueSize int
TelemetryLogStreamingEnabled bool
TelemetryHeartbeatEnabled bool

ChipIngressEndpoint string
ChipIngressInsecureConnection bool
Expand Down Expand Up @@ -187,6 +189,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize))
add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize))
add(envTelemetryLogStreamingEnabled, strconv.FormatBool(e.TelemetryLogStreamingEnabled))
add(envTelemetryHeartbeatEnabled, strconv.FormatBool(e.TelemetryHeartbeatEnabled))

add(envChipIngressEndpoint, e.ChipIngressEndpoint)
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
Expand Down Expand Up @@ -351,6 +354,10 @@ func (e *EnvConfig) parse() error {
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryLogStreamingEnabled, err)
}
e.TelemetryHeartbeatEnabled, err = getBool(envTelemetryHeartbeatEnabled)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryHeartbeatEnabled, err)
}
// Optional
e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint)
e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection)
Expand Down
Loading
Loading