Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a94c75b
INFOPLAT-2963 Adds TTL to loop config
hendoxc Oct 10, 2025
76d8492
INFOPLAT-2963 Wires up `Keystore` as `Signer` impl for beholder headers
hendoxc Oct 10, 2025
0ca9acd
Deferred signer
hendoxc Oct 10, 2025
243ee13
Makes lazy signer an interface
hendoxc Oct 14, 2025
a8180d8
Removes keystore mentions from loop server
hendoxc Oct 15, 2025
54deac7
Uses Initial provided headers to the lazy signer
hendoxc Oct 15, 2025
0f81da9
Makes configuration more clear
hendoxc Oct 15, 2025
303df4c
Simplify server beholder auth config
hendoxc Oct 15, 2025
d48f7b2
Adjust `Signer` interface to use `keyID string`
hendoxc Oct 15, 2025
6d982fc
Example of setting signer
hendoxc Oct 15, 2025
2abd8cf
`fmt`
hendoxc Oct 15, 2025
addb026
Sort `Auth` fields
hendoxc Oct 15, 2025
fa3d656
Reduces stuttering in interface
hendoxc Oct 15, 2025
743b596
Address comments
hendoxc Oct 15, 2025
716eed0
Sets config mechanism dependent on authheadertll
hendoxc Oct 15, 2025
b087dad
Merge branch 'main' into INFOPLAT-2963-beholder-rotating-auth-headers…
hendoxc Oct 15, 2025
2b77990
Wire up beholder client in relayer
hendoxc Oct 16, 2025
08a8370
Merge branch 'INFOPLAT-2963-beholder-rotating-auth-headers-loop-impl'…
hendoxc Oct 16, 2025
7a0497f
Merge branch 'main' into INFOPLAT-2963-beholder-rotating-auth-headers…
hendoxc Oct 16, 2025
2cf0fcf
Simply setting signer
hendoxc Oct 16, 2025
6b1456f
Merge branch 'INFOPLAT-2963-beholder-rotating-auth-headers-loop-impl'…
hendoxc Oct 16, 2025
e0b201d
Updates `SetSigner` comment
hendoxc Oct 16, 2025
6781ddd
Removes deferred beholder signer wire up
hendoxc Oct 16, 2025
b2413bd
Merge branch 'main' into INFOPLAT-2963-beholder-rotating-auth-headers…
hendoxc Oct 17, 2025
8f9ed7a
Fixes merge conflict
hendoxc Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions pkg/beholder/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Auth interface {
}

type Signer interface {
Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error)
Sign(ctx context.Context, keyID string, data []byte) ([]byte, error)
}

type staticAuth struct {
Expand Down Expand Up @@ -82,7 +82,10 @@ type rotatingAuth struct {
mu sync.Mutex
}

func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool) Auth {
// NewRotatingAuth creates a rotating auth mechanism that automatically refreshes headers.
// If initialHeaders are provided, they will be used immediately until TTL expires.
// After TTL expiration, the signer is called to generate new headers.
func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool, initialHeaders map[string]string) Auth {
r := &rotatingAuth{
csaPubKey: csaPubKey,
signer: signer,
Expand All @@ -91,7 +94,18 @@ func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Durati
lastUpdatedNanos: atomic.Int64{},
requireTransportSecurity: requireTransportSecurity,
}
r.headers.Store(make(map[string]string))

headers := make(map[string]string)
// If initial headers are provided, use them and set timestamp to now
// Otherwise, leave timestamp at 0 so headers are generated on first call
if len(initialHeaders) > 0 {
headers = initialHeaders
// We assume the time between the initial headers being generated is very small
r.lastUpdatedNanos.Store(time.Now().UnixNano())
}

r.headers.Store(headers)

return r
}

Expand Down Expand Up @@ -125,7 +139,7 @@ func (r *rotatingAuth) Headers(ctx context.Context) (map[string]string, error) {
defer cancel()

// Sign(public key bytes + timestamp bytes)
signature, err := r.signer.Sign(ctxWithTimeout, r.csaPubKey, msgBytes)
signature, err := r.signer.Sign(ctxWithTimeout, fmt.Sprintf("%x", r.csaPubKey), msgBytes)
if err != nil {
return nil, fmt.Errorf("beholder: failed to sign auth header: %w", err)
}
Expand Down
63 changes: 50 additions & 13 deletions pkg/beholder/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ed25519"
"encoding/hex"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -54,7 +55,7 @@ type MockSigner struct {
mock.Mock
}

func (m *MockSigner) Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error) {
func (m *MockSigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) {
args := m.Called(ctx, keyID, data)
return args.Get(0).([]byte), args.Error(1)
}
Expand All @@ -71,13 +72,13 @@ func TestRotatingAuth(t *testing.T) {
dummySignature := ed25519.Sign(privKey, []byte("test data"))

mockSigner.
On("Sign", mock.Anything, mock.MatchedBy(func(keyID []byte) bool {
return string(keyID) == string(pubKey) // Verify correct public key is passed
On("Sign", mock.Anything, mock.MatchedBy(func(keyID string) bool {
return keyID == hex.EncodeToString(pubKey) // Verify correct public key hex is passed
}), mock.Anything).
Return(dummySignature, nil)

ttl := 5 * time.Minute
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

headers, err := auth.Headers(t.Context())
require.NoError(t, err)
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestRotatingAuth(t *testing.T) {
Maybe()

ttl := 5 * time.Minute
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

headers1, err := auth.Headers(t.Context())
require.NoError(t, err)
Expand All @@ -135,7 +136,7 @@ func TestRotatingAuth(t *testing.T) {
Return([]byte{}, expectedErr)

ttl := 5 * time.Minute
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

headers, err := auth.Headers(t.Context())
require.Error(t, err)
Expand All @@ -157,7 +158,7 @@ func TestRotatingAuth(t *testing.T) {
Maybe()

ttl := 5 * time.Minute
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

creds := auth.Credentials()
require.NotNil(t, creds)
Expand All @@ -183,16 +184,52 @@ func TestRotatingAuth(t *testing.T) {

ttl := 5 * time.Minute
// transport security required
authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true)
authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true, nil)
credsSecure := authSecure.Credentials()
assert.True(t, credsSecure.RequireTransportSecurity())
// transport security not required
authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
credsInsecure := authInsecure.Credentials()
assert.False(t, credsInsecure.RequireTransportSecurity())

mockSigner.AssertExpectations(t)
})

t.Run("uses initial headers until TTL expires", func(t *testing.T) {
mockSigner := &MockSigner{}

// Create initial headers with v2 format
ts := time.Now()
signature := ed25519.Sign(privKey, []byte("initial"))
initialHeaders := map[string]string{
"X-Beholder-Node-Auth-Token": "2:" + hex.EncodeToString(pubKey) + ":" + fmt.Sprintf("%d", ts.UnixNano()) + ":" + hex.EncodeToString(signature),
}

// Use a very short TTL so it expires quickly
ttl := 1 * time.Millisecond
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, initialHeaders)

// First call should return the initial headers without calling Sign
headers1, err := auth.Headers(t.Context())
require.NoError(t, err)
assert.Equal(t, initialHeaders, headers1)

// Wait for TTL to expire
time.Sleep(5 * time.Millisecond)

// Now the signer should be called to generate new headers
newSignature := ed25519.Sign(privKey, []byte("new"))
mockSigner.
On("Sign", mock.Anything, mock.Anything, mock.Anything).
Return(newSignature, nil).
Once()

headers2, err := auth.Headers(t.Context())
require.NoError(t, err)
assert.NotEqual(t, initialHeaders, headers2, "Should generate new headers after TTL expires")

mockSigner.AssertExpectations(t)
})
}

// BenchmarkRotatingAuth_Headers_CachedPath benchmarks the fast path where headers are cached and within TTL.
Expand All @@ -212,7 +249,7 @@ func BenchmarkRotatingAuth_Headers_CachedPath(b *testing.B) {

// Use a long TTL so headers don't expire during the benchmark
ttl := 1 * time.Hour
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

// Prime the cache by calling Headers once
ctx := b.Context()
Expand Down Expand Up @@ -249,7 +286,7 @@ func BenchmarkRotatingAuth_Headers_ExpiredPath(b *testing.B) {

// Use a TTL of 0 to force regeneration on every call
ttl := 0 * time.Second
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

ctx := b.Context()

Expand Down Expand Up @@ -283,7 +320,7 @@ func BenchmarkRotatingAuth_Headers_ParallelCached(b *testing.B) {

// Use a long TTL so headers don't expire during the benchmark
ttl := 1 * time.Hour
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

// Prime the cache
ctx := b.Context()
Expand Down Expand Up @@ -323,7 +360,7 @@ func BenchmarkRotatingAuth_Headers_ParallelExpired(b *testing.B) {

// Use a short TTL to cause periodic regeneration
ttl := 10 * time.Millisecond
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

ctx := b.Context()

Expand Down
43 changes: 37 additions & 6 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Client struct {
MeterProvider otelmetric.MeterProvider
MessageLoggerProvider otellog.LoggerProvider

// lazySigner allows updating the keystore after client initialization.
lazySigner *lazySigner

// OnClose
OnClose func() error
}
Expand Down Expand Up @@ -100,12 +103,18 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint),
}
// Initialize auth here for reuse with log, trace, and metric exporters
// Two modes are supported:
// 1. Static auth: If AuthHeadersTTL == 0, use AuthHeaders as-is and never change
// 2. Rotating auth: If AuthHeadersTTL > 0, create lazySigner for deferred keystore injection
var signer *lazySigner
var auth Auth
if cfg.AuthKeySigner != nil {

if cfg.AuthHeadersTTL > 0 {

if cfg.AuthPublicKeyHex == "" {
return nil, fmt.Errorf("auth: public key hex required when signer is set")
return nil, fmt.Errorf("auth: public key hex required for rotating auth (TTL > 0)")
}

// Clamp lowest possible value to 10mins
if cfg.AuthHeadersTTL < 10*time.Minute {
return nil, fmt.Errorf("auth: headers TTL must be at least 10 minutes")
Expand All @@ -116,14 +125,21 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
return nil, fmt.Errorf("auth: failed to decode public key hex: %w", err)
}

auth = NewRotatingAuth(key, cfg.AuthKeySigner, cfg.AuthHeadersTTL, !cfg.InsecureConnection)
// Optionally wrap the signer in a lazySigner if AuthKeySigner was provided
// This allows the signer to be set both before and after client initialization
signer = &lazySigner{}
if cfg.AuthKeySigner != nil {
signer.Set(cfg.AuthKeySigner)
}

auth = NewRotatingAuth(key, signer, cfg.AuthHeadersTTL, !cfg.InsecureConnection, cfg.AuthHeaders)
}
// Log exporter auth
switch {
// Rotating auth
// Rotating auth mode
case auth != nil:
opts = append(opts, otlploggrpc.WithDialOption(authDialOpt(auth)))
// Static auth
// Static auth mode
case len(cfg.AuthHeaders) > 0:
opts = append(opts, otlploggrpc.WithHeaders(cfg.AuthHeaders))
// No auth
Expand Down Expand Up @@ -307,7 +323,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
}
return
}
return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil
}

// Closes all providers, flushes all data and stops all background processes
Expand Down Expand Up @@ -346,6 +362,21 @@ func (c Client) ForName(name string) Client {
return newClient
}

// SetSigner updates the signer in the lazy signer.
// This method enables setting the signer after the beholder client has been created, which is useful
// when the signer is not available at client initialization time but the client needs to be configured
// with rotating auth. The underlying lazy signer is thread-safe.
func (c *Client) SetSigner(signer Signer) {
if c.lazySigner != nil {
c.lazySigner.Set(signer)
}
}

// IsSignerSet returns true if a signer has been set in the lazy signer.
func (c *Client) IsSignerSet() bool {
return c.lazySigner != nil && c.lazySigner.IsSet()
}

func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) {
extraResources, err := sdkresource.New(
context.Background(),
Expand Down
11 changes: 4 additions & 7 deletions pkg/beholder/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,15 +559,12 @@ func TestNewGRPCClientRotatingAuth(t *testing.T) {
require.NotNil(t, client)
})

t.Run("error when public key hex is empty but signer is set", func(t *testing.T) {

mockSigner := &MockSigner{}
t.Run("error when public key hex is empty but TTL is set", func(t *testing.T) {

cfg := beholder.Config{
OtelExporterGRPCEndpoint: "localhost:4317",
AuthPublicKeyHex: "", // Empty public key hex
AuthKeySigner: mockSigner,
AuthHeadersTTL: 10 * time.Minute,
AuthPublicKeyHex: "", // Empty public key hex
AuthHeadersTTL: 10 * time.Minute, // TTL > 0 requires public key
InsecureConnection: true,
}

Expand All @@ -578,7 +575,7 @@ func TestNewGRPCClientRotatingAuth(t *testing.T) {
client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew)
require.Error(t, err)
assert.Nil(t, client)
assert.Contains(t, err.Error(), "auth: public key hex required when signer is set")
assert.Contains(t, err.Error(), "auth: public key hex required for rotating auth")
})

t.Run("error when TTL is too short", func(t *testing.T) {
Expand Down
16 changes: 12 additions & 4 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ type Config struct {
LogLevel zapcore.Level // Log level for telemetry streaming

// Auth
AuthPublicKeyHex string
// AuthHeaders serves two purposes:
// 1. Static mode: When AuthKeySigner is nil, these headers are used as-is and never change
// 2. Rotating mode: When AuthKeySigner is set, these headers are used as initial headers
// until TTL expires, then the lazy signer generates new ones
AuthHeaders map[string]string
AuthKeySigner Signer
AuthHeadersTTL time.Duration
AuthKeySigner Signer
AuthPublicKeyHex string
}

type RetryConfig struct {
Expand Down Expand Up @@ -120,8 +124,8 @@ func DefaultConfig() Config {
LogBatchProcessor: true,
LogStreamingEnabled: true, // Enable logs streaming by default
LogLevel: zapcore.InfoLevel,
// Auth
AuthHeadersTTL: 10 * time.Minute,
// Auth (defaults to static auth mode with TTL=0)
AuthHeadersTTL: 0,
}
}

Expand All @@ -134,6 +138,8 @@ func TestDefaultConfig() Config {
config.LogRetryConfig.MaxElapsedTime = 0 // Retry is disabled
config.TraceRetryConfig.MaxElapsedTime = 0 // Retry is disabled
config.MetricRetryConfig.MaxElapsedTime = 0 // Retry is disabled
// Auth disabled for testing (TTL=0 means static auth mode)
config.AuthHeadersTTL = 0
return config
}

Expand All @@ -144,6 +150,8 @@ func TestDefaultConfigHTTPClient() Config {
config.LogBatchProcessor = false
config.OtelExporterGRPCEndpoint = ""
config.OtelExporterHTTPEndpoint = "localhost:4318"
// Auth disabled for testing (TTL=0 means static auth mode)
config.AuthHeadersTTL = 0
return config
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ func ExampleConfig() {
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info AuthPublicKeyHex: AuthHeaders:map[] AuthKeySigner:<nil> AuthHeadersTTL:0s}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner:<nil> AuthPublicKeyHex:}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
}
3 changes: 2 additions & 1 deletion pkg/beholder/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
}
return
}
return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
// HTTP client doesn't currently support rotating auth, so lazySigner is always nil
return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose}, nil
}

func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) {
Expand Down
Loading
Loading