diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index d08226961d..336f62ece9 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -49,6 +49,9 @@ type Config struct { LogRetryConfig *RetryConfig LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter + // Heartbeat + HeartbeatEnabled bool // Enable periodic heartbeat emission + // Auth AuthPublicKeyHex string AuthHeaders map[string]string @@ -115,6 +118,7 @@ func DefaultConfig() Config { LogMaxQueueSize: 2048, LogBatchProcessor: true, LogStreamingEnabled: true, // Enable logs streaming by default + HeartbeatEnabled: true, // Enable heartbeat by default } } diff --git a/pkg/beholder/heartbeat.go b/pkg/beholder/heartbeat.go new file mode 100644 index 0000000000..42ebfc6ced --- /dev/null +++ b/pkg/beholder/heartbeat.go @@ -0,0 +1,227 @@ +package beholder + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb" + "github.com/smartcontractkit/chainlink-common/pkg/config/build" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" +) + +// Heartbeat represents a periodic heartbeat service that emits metrics and logs +type Heartbeat struct { + services.Service + eng *services.Engine + + Beat time.Duration + Emitter Emitter + Meter metric.Meter + Logger logger.Logger + Tracer trace.Tracer + AppID string + ServiceName string + Version string + Commit string + Labels map[string]string +} + +// NewHeartbeat creates a new heartbeat service with custom configuration +func NewHeartbeat(beat time.Duration, lggr logger.Logger, opts ...HeartbeatOpt) *Heartbeat { + // Setup default emitter, meter, and tracer + noopClient := NewNoopClient() + + // Create heartbeat with defaults + h := &Heartbeat{ + Beat: beat, + Logger: lggr, + Emitter: noopClient.Emitter, + Meter: noopClient.Meter, + Tracer: noopClient.Tracer, + AppID: "chainlink", // Default app ID + ServiceName: build.Program, // Default service name + Version: build.Version, // Use build version + Commit: build.ChecksumPrefix, // Use build commit + Labels: make(map[string]string), + } + + // Apply options + for _, opt := range opts { + opt(h) + } + + // Build labels from current values + h.Labels = map[string]string{ + "service": h.ServiceName, + "version": h.Version, + "commit": h.Commit, + } + if h.AppID != "" { + h.Labels["app_id"] = h.AppID + } + + // Create service engine + h.Service, h.eng = services.Config{ + Name: "BeholderHeartbeat", + Start: h.start, + }.NewServiceEngine(lggr) + + return h +} + +// HeartbeatOpt is a functional option for configuring the heartbeat +type HeartbeatOpt func(*Heartbeat) + +// WithEmitter sets a custom message emitter for the heartbeat +func WithEmitter(emitter Emitter) HeartbeatOpt { + return func(h *Heartbeat) { + h.Emitter = emitter + } +} + +// WithMeter sets a custom meter for the heartbeat +func WithMeter(meter metric.Meter) HeartbeatOpt { + return func(h *Heartbeat) { + h.Meter = meter + } +} + +// WithTracer sets a custom tracer for the heartbeat +func WithTracer(tracer trace.Tracer) HeartbeatOpt { + return func(h *Heartbeat) { + h.Tracer = tracer + } +} + +// WithAppID sets a custom app ID for the heartbeat +func WithAppID(appID string) HeartbeatOpt { + return func(h *Heartbeat) { + h.AppID = appID + if appID != "" { + h.Labels["app_id"] = appID + } else { + delete(h.Labels, "app_id") + } + } +} + +// WithServiceName sets a custom service name for the heartbeat +func WithServiceName(serviceName string) HeartbeatOpt { + return func(h *Heartbeat) { + h.ServiceName = serviceName + h.Labels["service"] = serviceName + } +} + +// WithVersion sets a custom version for the heartbeat +func WithVersion(version string) HeartbeatOpt { + return func(h *Heartbeat) { + h.Version = version + h.Labels["version"] = version + } +} + +// WithCommit sets a custom commit for the heartbeat +func WithCommit(commit string) HeartbeatOpt { + return func(h *Heartbeat) { + h.Commit = commit + h.Labels["commit"] = commit + } +} + +// WithBeatInterval sets a custom beat interval for the heartbeat +func WithBeatInterval(beat time.Duration) HeartbeatOpt { + return func(h *Heartbeat) { + h.Beat = beat + } +} + +// start initializes and starts the heartbeat service +func (h *Heartbeat) start(ctx context.Context) error { + // Create heartbeat metrics + heartbeatGauge, err := h.Meter.Int64Gauge("beholder_heartbeat") + if err != nil { + return fmt.Errorf("failed to create heartbeat status gauge: %w", err) + } + + heartbeatCount, err := h.Meter.Int64Counter("beholder_heartbeat_count") + if err != nil { + return fmt.Errorf("failed to create heartbeat counter: %w", err) + } + + // Define the heartbeat function + beatFn := func(ctx context.Context) { + start := time.Now() + + // Create a trace span for the heartbeat + ctx, span := h.Tracer.Start(ctx, "beholder_heartbeat", trace.WithAttributes( + attribute.String("service", h.ServiceName), + attribute.String("app_id", h.AppID), + attribute.String("version", h.Version), + attribute.String("commit", h.Commit), + )) + defer span.End() + + // Record heartbeat metrics + heartbeatGauge.Record(ctx, 1) + heartbeatCount.Add(ctx, 1) + + // Emit heartbeat message + + payload := &pb.BaseMessage{ + Msg: "beholder heartbeat", + Labels: h.Labels, + } + payloadBytes, err := proto.Marshal(payload) + if err != nil { + // log error + h.Logger.Errorw("heartbeat marshal protobuf failed", "err", err) + } + + err = h.Emitter.Emit(ctx, payloadBytes, + AttrKeyDataSchema, "/beholder-base-message/versions/1", // required + AttrKeyDomain, "platform", // required + AttrKeyEntity, "BaseMessage", // required + "service", h.ServiceName, + "app_id", h.AppID, + "version", h.Version, + "commit", h.Commit, + "timestamp", start.Unix(), + ) + + if err != nil { + h.Logger.Errorw("heartbeat emit failed", "err", err) + } + + // Log heartbeat + h.Logger.Debugw("beholder heartbeat emitted", + "service", h.ServiceName, + "app_id", h.AppID, + "version", h.Version, + "commit", h.Commit, + "timestamp", start.Unix(), + ) + } + + // Start the heartbeat ticker + // Execute immediately first, then continue with regular intervals + h.eng.Go(func(ctx context.Context) { + beatFn(ctx) + }) + h.eng.GoTick(timeutil.NewTicker(func() time.Duration { return h.Beat }), beatFn) + + h.Logger.Infow("beholder heartbeat service started", + "service", h.ServiceName, + "beat_interval", h.Beat, + ) + + return nil +} diff --git a/pkg/beholder/heartbeat_test.go b/pkg/beholder/heartbeat_test.go new file mode 100644 index 0000000000..429dbaec51 --- /dev/null +++ b/pkg/beholder/heartbeat_test.go @@ -0,0 +1,71 @@ +package beholder_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func TestHeartbeat_NewHeartbeat(t *testing.T) { + lggr, err := logger.New() + require.NoError(t, err) + + heartbeat := beholder.NewHeartbeat( + 1*time.Second, + lggr, + beholder.WithAppID("test-app"), + beholder.WithServiceName("test-service"), + beholder.WithVersion("1.0.0"), + beholder.WithCommit("abc123"), + ) + require.NotNil(t, heartbeat) + + assert.Equal(t, "test-app", heartbeat.AppID) + assert.Equal(t, "test-service", heartbeat.ServiceName) + assert.Equal(t, "1.0.0", heartbeat.Version) + assert.Equal(t, "abc123", heartbeat.Commit) + assert.Equal(t, 1*time.Second, heartbeat.Beat) + assert.NotNil(t, heartbeat.Emitter) + assert.NotNil(t, heartbeat.Meter) +} + +func TestHeartbeat_Start(t *testing.T) { + lggr, err := logger.New() + require.NoError(t, err) + + heartbeat := beholder.NewHeartbeat(100*time.Millisecond, lggr) + require.NotNil(t, heartbeat) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + err = heartbeat.Start(ctx) + require.NoError(t, err) + + // Wait for at least one heartbeat + time.Sleep(150 * time.Millisecond) + + err = heartbeat.Close() + require.NoError(t, err) +} + +func TestHeartbeat_Defaults(t *testing.T) { + lggr, err := logger.New() + require.NoError(t, err) + + heartbeat := beholder.NewHeartbeat(1*time.Second, lggr) + require.NotNil(t, heartbeat) + + // Check defaults + assert.Equal(t, "chainlink", heartbeat.AppID) + assert.Equal(t, "github.com/smartcontractkit/chainlink-common", heartbeat.ServiceName) + assert.Equal(t, "(devel)", heartbeat.Version) + assert.Equal(t, "unset", heartbeat.Commit) + assert.Equal(t, 1*time.Second, heartbeat.Beat) +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 6ac6031e78..46414c15d0 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -96,6 +96,17 @@ func NewWith(cfgFn func(*zap.Config)) (Logger, error) { return &logger{core.Sugar()}, nil } +// NewCore returns a new Logger core from a modified [zap.Config]. +func NewCore(cfgFn func(*zap.Config)) (zapcore.Core, error) { + cfg := zap.NewProductionConfig() + cfgFn(&cfg) + logger, err := cfg.Build() + if err != nil { + return nil, err + } + return logger.Core(), nil +} + // NewWithSync returns a new Logger with a given SyncWriter. func NewWithSync(w io.Writer) Logger { core := zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), zapcore.AddSync(w), zapcore.InfoLevel) diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index ae496142c7..0950edb7f1 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -324,3 +324,25 @@ type differentLogger interface { Sync() error } + +func TestNewCore(t *testing.T) { + // First core at Info (would drop Debug), second core at Debug + obsCore, obsLogs := observer.New(zap.DebugLevel) + + primaryCore, err := NewCore(func(cfg *zap.Config) { + cfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel) + }) + if err != nil { + t.Fatalf("NewCore error: %v", err) + } + + lggr := NewWithCores(primaryCore, obsCore) + + lggr.Debug("debug message should reach observer core") + if got := obsLogs.Len(); got != 1 { + t.Fatalf("expected 1 log in observer core, got %d", got) + } + if msg := obsLogs.All()[0].Message; msg != "debug message should reach observer core" { + t.Fatalf("unexpected message: %s", msg) + } +} diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 8bc42daa85..33db728d7e 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -62,6 +62,7 @@ const ( envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE" envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE" envTelemetryLogStreamingEnabled = "CL_TELEMETRY_LOG_STREAMING_ENABLED" + envTelemetryHeartbeatEnabled = "CL_TELEMETRY_HEARTBEAT_ENABLED" envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" @@ -118,6 +119,7 @@ type EnvConfig struct { TelemetryEmitterExportMaxBatchSize int TelemetryEmitterMaxQueueSize int TelemetryLogStreamingEnabled bool + TelemetryHeartbeatEnabled bool ChipIngressEndpoint string ChipIngressInsecureConnection bool @@ -187,6 +189,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize)) add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize)) add(envTelemetryLogStreamingEnabled, strconv.FormatBool(e.TelemetryLogStreamingEnabled)) + add(envTelemetryHeartbeatEnabled, strconv.FormatBool(e.TelemetryHeartbeatEnabled)) add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) @@ -351,6 +354,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envTelemetryLogStreamingEnabled, err) } + e.TelemetryHeartbeatEnabled, err = getBool(envTelemetryHeartbeatEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryHeartbeatEnabled, err) + } // Optional e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint) e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection) diff --git a/pkg/loop/logger.go b/pkg/loop/logger.go index 76b6bd11a0..848d92341d 100644 --- a/pkg/loop/logger.go +++ b/pkg/loop/logger.go @@ -13,7 +13,10 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/exp/slices" + otellog "go.opentelemetry.io/otel/log" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap" ) // HCLogLogger returns an [hclog.Logger] backed by the given [logger.Logger]. @@ -162,13 +165,38 @@ func (h *hclSinkAdapter) Accept(_ string, level hclog.Level, msg string, args .. // NewLogger returns a new [logger.Logger] configured to encode [hclog] compatible JSON. func NewLogger() (logger.Logger, error) { - return logger.NewWith(func(cfg *zap.Config) { - cfg.Level.SetLevel(zap.DebugLevel) - cfg.EncoderConfig.LevelKey = "@level" - cfg.EncoderConfig.MessageKey = "@message" - cfg.EncoderConfig.TimeKey = "@timestamp" - cfg.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02T15:04:05.000000Z07:00") - }) + return logger.NewWith(configureHCLogEncoder) +} + +// configureHCLogEncoder mutates cfg to use hclog-compatible field names and timestamp format. +// NOTE: It also sets the log level to Debug to preserve prior behavior where each caller +// manually set Debug before applying identical encoder tweaks. Centralizing avoids drift. +// If a different level is desired, callers should override cfg.Level AFTER calling this helper. +func configureHCLogEncoder(cfg *zap.Config) { + cfg.Level.SetLevel(zap.DebugLevel) + cfg.EncoderConfig.LevelKey = "@level" + cfg.EncoderConfig.MessageKey = "@message" + cfg.EncoderConfig.TimeKey = "@timestamp" + cfg.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02T15:04:05.000000Z07:00") +} + +// NewOtelLogger returns a logger with two cores: +// 1. The primary JSON core configured via cfgFn (encoder keys changed to @level, @message, @timestamp). +// 2. The otel core (otelzap.NewCore) which receives the raw zap.Entry and fields. +// +// Important: +// The cfgFn only mutates the encoder config used to build the first core. +// otelzap.NewCore implements zapcore.Core and does NOT use that encoder; it derives attributes from the zap.Entry +// (Message, Level, Time, etc.) and zap.Fields directly. Therefore changing encoder keys here does NOT affect how +// the otel core extracts data, and only the first core's JSON output format is altered. +// This preserves backward compatibility for OTEL export while allowing hclog-compatible key names in the primary output. +func NewOtelLogger(otelLogger otellog.Logger) (logger.Logger, error) { + primaryCore, err := logger.NewCore(configureHCLogEncoder) + if err != nil { + return nil, err + } + // set debug level from primaryCore to match otelzap.NewCore + return logger.NewWithCores(primaryCore, otelzap.NewCore(otelLogger, otelzap.WithLevel(zapcore.DebugLevel))), nil } // onceValue returns a function that invokes f only once and returns the value diff --git a/pkg/loop/logger_test.go b/pkg/loop/logger_test.go index 419a7e727a..731f97b46b 100644 --- a/pkg/loop/logger_test.go +++ b/pkg/loop/logger_test.go @@ -1,9 +1,15 @@ package loop import ( + "context" + "sync" "testing" "github.com/stretchr/testify/assert" + sdklog "go.opentelemetry.io/otel/sdk/log" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap" ) func Test_removeArg(t *testing.T) { @@ -38,3 +44,74 @@ func Test_removeArg(t *testing.T) { }) } } + +func TestNewOtelLogger(t *testing.T) { + tests := []struct { + name string + logFn func(l logger.Logger) + wantMsg string + }{ + { + name: "debug", + logFn: func(l logger.Logger) { + l.Debugw("hello world", "k", "v") + }, + wantMsg: "hello world", + }, + { + name: "info", + logFn: func(l logger.Logger) { + l.Infow("info msg", "a", 1) + }, + wantMsg: "info msg", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + exp := &recordingExporter{} + lp := sdklog.NewLoggerProvider( + sdklog.WithProcessor(sdklog.NewSimpleProcessor(exp)), + ) + otelLggr := lp.Logger("test-" + tt.name) + + lggr, err := NewOtelLogger(otelLggr) + if err != nil { + t.Fatalf("NewOtelLogger error: %v", err) + } + + tt.logFn(lggr) + + // Force flush the logger provider to ensure records are exported + if err := lp.ForceFlush(context.Background()); err != nil { + t.Fatalf("ForceFlush error: %v", err) + } + + if len(exp.records) != 1 { + t.Fatalf("expected 1 exported record, got %d", len(exp.records)) + } + if got := exp.records[0].Body().AsString(); got != tt.wantMsg { + t.Fatalf("unexpected body: got %q want %q", got, tt.wantMsg) + } + }) + } +} + +// recordingExporter captures exported log records (current sdk/log Export signature). +type recordingExporter struct { + mu sync.Mutex + records []sdklog.Record +} + +func (r *recordingExporter) Export(_ context.Context, recs []sdklog.Record) error { + r.mu.Lock() + defer r.mu.Unlock() + r.records = append(r.records, recs...) + return nil +} +func (r *recordingExporter) ForceFlush(context.Context) error { return nil } +func (r *recordingExporter) Shutdown(context.Context) error { return nil } + +// Compile-time assertion that otelzap.NewCore still satisfies zapcore.Core usage pattern. +// (Guards against accidental API break causing this test file to silently compile with stubs.) +var _ = otelzap.NewCore +var _ logger.Logger // silence unused import of logger in case future refactors remove usage diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 9bfde05b07..f3f46bf86d 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -78,6 +78,7 @@ type Server struct { checker *services.HealthChecker LimitsFactory limits.Factory otelViews []sdkmetric.View + heartbeat *beholder.Heartbeat // optional } func newServer(loggerName string, otelViews []sdkmetric.View) (*Server, error) { @@ -144,6 +145,7 @@ func (s *Server) start() error { ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "", ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint, ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + HeartbeatEnabled: s.EnvConfig.TelemetryHeartbeatEnabled, } // note: due to the OTEL specification, all histogram buckets @@ -167,6 +169,31 @@ func (s *Server) start() error { } beholder.SetClient(beholderClient) beholder.SetGlobalOtelProviders() + + if beholderCfg.LogStreamingEnabled { + otelLogger, err := NewOtelLogger(beholderClient.Logger) + if err != nil { + return fmt.Errorf("failed to enable log streaming: %w", err) + } + s.Logger = logger.Sugared(logger.Named(otelLogger, s.Logger.Name())) + } + + if s.EnvConfig.TelemetryHeartbeatEnabled { + s.heartbeat = beholder.NewHeartbeat( + 30*time.Second, // heartbeat interval + s.Logger, + beholder.WithAppID(s.EnvConfig.AppID), + beholder.WithMeter(beholderClient.Meter), + beholder.WithEmitter(beholderClient.Emitter), + ) + + // Start the heartbeat service + if err := s.heartbeat.Start(ctx); err != nil { + s.Logger.Errorw("Failed to start heartbeat service", "error", err) + } else { + s.Logger.Infow("Heartbeat service started", "interval", "30s") + } + } } s.promServer = NewPromServer(s.EnvConfig.PrometheusPort, s.Logger) @@ -230,6 +257,9 @@ func (s *Server) Register(c services.HealthReporter) error { return s.checker.Re // Stop closes resources and flushes logs. func (s *Server) Stop() { + if s.heartbeat != nil { + s.Logger.ErrorIfFn(s.heartbeat.Close, "Failed to close heartbeat service") + } if s.dbStatsReporter != nil { s.dbStatsReporter.Stop() }