-
Notifications
You must be signed in to change notification settings - Fork 24
Beholder heartbeat #1572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
pkcll
wants to merge
5
commits into
main
Choose a base branch
from
INFOPLAT-2938-logs-streaming-loopp-heartbeat
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Beholder heartbeat #1572
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
bff6e2b
loop/server: use otelzap logger
pkcll adf46dd
fix test
pkcll 7bb83e1
beholder heartbeat
pkcll e9d2e86
Merge branch 'main' of github.com:smartcontractkit/chainlink-common i…
pkcll ef4be12
Merge branch 'INFOPLAT-2938-logs-streaming-loopp' into INFOPLAT-2938-…
pkcll File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
package beholder | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/metric" | ||
"go.opentelemetry.io/otel/trace" | ||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb" | ||
"github.com/smartcontractkit/chainlink-common/pkg/config/build" | ||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-common/pkg/services" | ||
"github.com/smartcontractkit/chainlink-common/pkg/timeutil" | ||
) | ||
|
||
// Heartbeat represents a periodic heartbeat service that emits metrics and logs | ||
type Heartbeat struct { | ||
services.Service | ||
eng *services.Engine | ||
|
||
Beat time.Duration | ||
Emitter Emitter | ||
Meter metric.Meter | ||
Logger logger.Logger | ||
Tracer trace.Tracer | ||
AppID string | ||
ServiceName string | ||
Version string | ||
Commit string | ||
Labels map[string]string | ||
} | ||
|
||
// NewHeartbeat creates a new heartbeat service with custom configuration | ||
func NewHeartbeat(beat time.Duration, lggr logger.Logger, opts ...HeartbeatOpt) *Heartbeat { | ||
// Setup default emitter, meter, and tracer | ||
noopClient := NewNoopClient() | ||
|
||
// Create heartbeat with defaults | ||
h := &Heartbeat{ | ||
Beat: beat, | ||
Logger: lggr, | ||
Emitter: noopClient.Emitter, | ||
Meter: noopClient.Meter, | ||
Tracer: noopClient.Tracer, | ||
AppID: "chainlink", // Default app ID | ||
ServiceName: build.Program, // Default service name | ||
Version: build.Version, // Use build version | ||
Commit: build.ChecksumPrefix, // Use build commit | ||
Labels: make(map[string]string), | ||
} | ||
|
||
// Apply options | ||
for _, opt := range opts { | ||
opt(h) | ||
} | ||
|
||
// Build labels from current values | ||
h.Labels = map[string]string{ | ||
"service": h.ServiceName, | ||
"version": h.Version, | ||
"commit": h.Commit, | ||
} | ||
if h.AppID != "" { | ||
h.Labels["app_id"] = h.AppID | ||
} | ||
|
||
// Create service engine | ||
h.Service, h.eng = services.Config{ | ||
Name: "BeholderHeartbeat", | ||
Start: h.start, | ||
}.NewServiceEngine(lggr) | ||
|
||
return h | ||
} | ||
|
||
// HeartbeatOpt is a functional option for configuring the heartbeat | ||
type HeartbeatOpt func(*Heartbeat) | ||
|
||
// WithEmitter sets a custom message emitter for the heartbeat | ||
func WithEmitter(emitter Emitter) HeartbeatOpt { | ||
return func(h *Heartbeat) { | ||
h.Emitter = emitter | ||
} | ||
} | ||
|
||
// WithMeter sets a custom meter for the heartbeat | ||
func WithMeter(meter metric.Meter) HeartbeatOpt { | ||
return func(h *Heartbeat) { | ||
h.Meter = meter | ||
} | ||
} | ||
|
||
// WithTracer sets a custom tracer for the heartbeat | ||
func WithTracer(tracer trace.Tracer) HeartbeatOpt { | ||
return func(h *Heartbeat) { | ||
h.Tracer = tracer | ||
} | ||
} | ||
|
||
// WithAppID sets a custom app ID for the heartbeat | ||
func WithAppID(appID string) HeartbeatOpt { | ||
return func(h *Heartbeat) { | ||
h.AppID = appID | ||
if appID != "" { | ||
h.Labels["app_id"] = appID | ||
} else { | ||
delete(h.Labels, "app_id") | ||
} | ||
} | ||
} | ||
|
||
// WithServiceName sets a custom service name for the heartbeat | ||
func WithServiceName(serviceName string) HeartbeatOpt { | ||
return func(h *Heartbeat) { | ||
h.ServiceName = serviceName | ||
h.Labels["service"] = serviceName | ||
} | ||
} | ||
|
||
// WithVersion sets a custom version for the heartbeat | ||
func WithVersion(version string) HeartbeatOpt { | ||
return func(h *Heartbeat) { | ||
h.Version = version | ||
h.Labels["version"] = version | ||
} | ||
} | ||
|
||
// WithCommit sets a custom commit for the heartbeat | ||
func WithCommit(commit string) HeartbeatOpt { | ||
return func(h *Heartbeat) { | ||
h.Commit = commit | ||
h.Labels["commit"] = commit | ||
} | ||
} | ||
|
||
// WithBeatInterval sets a custom beat interval for the heartbeat | ||
func WithBeatInterval(beat time.Duration) HeartbeatOpt { | ||
return func(h *Heartbeat) { | ||
h.Beat = beat | ||
} | ||
} | ||
|
||
// start initializes and starts the heartbeat service | ||
func (h *Heartbeat) start(ctx context.Context) error { | ||
// Create heartbeat metrics | ||
heartbeatGauge, err := h.Meter.Int64Gauge("beholder_heartbeat") | ||
if err != nil { | ||
return fmt.Errorf("failed to create heartbeat status gauge: %w", err) | ||
} | ||
|
||
heartbeatCount, err := h.Meter.Int64Counter("beholder_heartbeat_count") | ||
if err != nil { | ||
return fmt.Errorf("failed to create heartbeat counter: %w", err) | ||
} | ||
|
||
// Define the heartbeat function | ||
beatFn := func(ctx context.Context) { | ||
start := time.Now() | ||
|
||
// Create a trace span for the heartbeat | ||
ctx, span := h.Tracer.Start(ctx, "beholder_heartbeat", trace.WithAttributes( | ||
attribute.String("service", h.ServiceName), | ||
attribute.String("app_id", h.AppID), | ||
attribute.String("version", h.Version), | ||
attribute.String("commit", h.Commit), | ||
)) | ||
defer span.End() | ||
|
||
// Record heartbeat metrics | ||
heartbeatGauge.Record(ctx, 1) | ||
heartbeatCount.Add(ctx, 1) | ||
|
||
// Emit heartbeat message | ||
|
||
payload := &pb.BaseMessage{ | ||
Msg: "beholder heartbeat", | ||
Labels: h.Labels, | ||
} | ||
payloadBytes, err := proto.Marshal(payload) | ||
if err != nil { | ||
// log error | ||
h.Logger.Errorw("heartbeat marshal protobuf failed", "err", err) | ||
} | ||
|
||
err = h.Emitter.Emit(ctx, payloadBytes, | ||
AttrKeyDataSchema, "/beholder-base-message/versions/1", // required | ||
AttrKeyDomain, "platform", // required | ||
AttrKeyEntity, "BaseMessage", // required | ||
"service", h.ServiceName, | ||
"app_id", h.AppID, | ||
"version", h.Version, | ||
"commit", h.Commit, | ||
"timestamp", start.Unix(), | ||
) | ||
|
||
if err != nil { | ||
h.Logger.Errorw("heartbeat emit failed", "err", err) | ||
} | ||
|
||
// Log heartbeat | ||
h.Logger.Debugw("beholder heartbeat emitted", | ||
"service", h.ServiceName, | ||
"app_id", h.AppID, | ||
"version", h.Version, | ||
"commit", h.Commit, | ||
"timestamp", start.Unix(), | ||
) | ||
} | ||
|
||
// Start the heartbeat ticker | ||
// Execute immediately first, then continue with regular intervals | ||
h.eng.Go(func(ctx context.Context) { | ||
beatFn(ctx) | ||
}) | ||
h.eng.GoTick(timeutil.NewTicker(func() time.Duration { return h.Beat }), beatFn) | ||
|
||
h.Logger.Infow("beholder heartbeat service started", | ||
"service", h.ServiceName, | ||
"beat_interval", h.Beat, | ||
) | ||
|
||
return nil | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package beholder_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/beholder" | ||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
) | ||
|
||
func TestHeartbeat_NewHeartbeat(t *testing.T) { | ||
lggr, err := logger.New() | ||
require.NoError(t, err) | ||
|
||
heartbeat := beholder.NewHeartbeat( | ||
1*time.Second, | ||
lggr, | ||
beholder.WithAppID("test-app"), | ||
beholder.WithServiceName("test-service"), | ||
beholder.WithVersion("1.0.0"), | ||
beholder.WithCommit("abc123"), | ||
) | ||
require.NotNil(t, heartbeat) | ||
|
||
assert.Equal(t, "test-app", heartbeat.AppID) | ||
assert.Equal(t, "test-service", heartbeat.ServiceName) | ||
assert.Equal(t, "1.0.0", heartbeat.Version) | ||
assert.Equal(t, "abc123", heartbeat.Commit) | ||
assert.Equal(t, 1*time.Second, heartbeat.Beat) | ||
assert.NotNil(t, heartbeat.Emitter) | ||
assert.NotNil(t, heartbeat.Meter) | ||
} | ||
|
||
func TestHeartbeat_Start(t *testing.T) { | ||
lggr, err := logger.New() | ||
require.NoError(t, err) | ||
|
||
heartbeat := beholder.NewHeartbeat(100*time.Millisecond, lggr) | ||
require.NotNil(t, heartbeat) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) | ||
defer cancel() | ||
|
||
err = heartbeat.Start(ctx) | ||
require.NoError(t, err) | ||
|
||
// Wait for at least one heartbeat | ||
time.Sleep(150 * time.Millisecond) | ||
|
||
err = heartbeat.Close() | ||
require.NoError(t, err) | ||
} | ||
|
||
func TestHeartbeat_Defaults(t *testing.T) { | ||
lggr, err := logger.New() | ||
require.NoError(t, err) | ||
|
||
heartbeat := beholder.NewHeartbeat(1*time.Second, lggr) | ||
require.NotNil(t, heartbeat) | ||
|
||
// Check defaults | ||
assert.Equal(t, "chainlink", heartbeat.AppID) | ||
assert.Equal(t, "github.com/smartcontractkit/chainlink-common", heartbeat.ServiceName) | ||
assert.Equal(t, "(devel)", heartbeat.Version) | ||
assert.Equal(t, "unset", heartbeat.Commit) | ||
assert.Equal(t, 1*time.Second, heartbeat.Beat) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These will pollute the
beholder
pkg namespace. might be better to use a config struct likeheartbeatConfig