Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a94c75b
INFOPLAT-2963 Adds TTL to loop config
hendoxc Oct 10, 2025
76d8492
INFOPLAT-2963 Wires up `Keystore` as `Signer` impl for beholder headers
hendoxc Oct 10, 2025
0ca9acd
Deferred signer
hendoxc Oct 10, 2025
243ee13
Makes lazy signer an interface
hendoxc Oct 14, 2025
a8180d8
Removes keystore mentions from loop server
hendoxc Oct 15, 2025
54deac7
Uses Initial provided headers to the lazy signer
hendoxc Oct 15, 2025
0f81da9
Makes configuration more clear
hendoxc Oct 15, 2025
303df4c
Simplify server beholder auth config
hendoxc Oct 15, 2025
d48f7b2
Adjust `Signer` interface to use `keyID string`
hendoxc Oct 15, 2025
6d982fc
Example of setting signer
hendoxc Oct 15, 2025
2abd8cf
`fmt`
hendoxc Oct 15, 2025
addb026
Sort `Auth` fields
hendoxc Oct 15, 2025
fa3d656
Reduces stuttering in interface
hendoxc Oct 15, 2025
743b596
Address comments
hendoxc Oct 15, 2025
716eed0
Sets config mechanism dependent on authheadertll
hendoxc Oct 15, 2025
b087dad
Merge branch 'main' into INFOPLAT-2963-beholder-rotating-auth-headers…
hendoxc Oct 15, 2025
2b77990
Wire up beholder client in relayer
hendoxc Oct 16, 2025
08a8370
Merge branch 'INFOPLAT-2963-beholder-rotating-auth-headers-loop-impl'…
hendoxc Oct 16, 2025
7a0497f
Merge branch 'main' into INFOPLAT-2963-beholder-rotating-auth-headers…
hendoxc Oct 16, 2025
2cf0fcf
Simply setting signer
hendoxc Oct 16, 2025
6b1456f
Merge branch 'INFOPLAT-2963-beholder-rotating-auth-headers-loop-impl'…
hendoxc Oct 16, 2025
e0b201d
Updates `SetSigner` comment
hendoxc Oct 16, 2025
6781ddd
Removes deferred beholder signer wire up
hendoxc Oct 16, 2025
b2413bd
Merge branch 'main' into INFOPLAT-2963-beholder-rotating-auth-headers…
hendoxc Oct 17, 2025
8f9ed7a
Fixes merge conflict
hendoxc Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions pkg/beholder/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Auth interface {
}

type Signer interface {
Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error)
Sign(ctx context.Context, keyID string, data []byte) ([]byte, error)
}

type staticAuth struct {
Expand Down Expand Up @@ -82,7 +82,10 @@ type rotatingAuth struct {
mu sync.Mutex
}

func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool) Auth {
// NewRotatingAuth creates a rotating auth mechanism that automatically refreshes headers.
// If initialHeaders are provided, they will be used immediately until TTL expires.
// After TTL expiration, the signer is called to generate new headers.
func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool, initialHeaders map[string]string) Auth {
r := &rotatingAuth{
csaPubKey: csaPubKey,
signer: signer,
Expand All @@ -91,7 +94,18 @@ func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Durati
lastUpdatedNanos: atomic.Int64{},
requireTransportSecurity: requireTransportSecurity,
}
r.headers.Store(make(map[string]string))

headers := make(map[string]string)
// If initial headers are provided, use them and set timestamp to now
// Otherwise, leave timestamp at 0 so headers are generated on first call
if len(initialHeaders) > 0 {
headers = initialHeaders
// We assume the time between the initial headers being generated is very small
r.lastUpdatedNanos.Store(time.Now().UnixNano())
}

r.headers.Store(headers)

return r
}

Expand Down Expand Up @@ -125,7 +139,7 @@ func (r *rotatingAuth) Headers(ctx context.Context) (map[string]string, error) {
defer cancel()

// Sign(public key bytes + timestamp bytes)
signature, err := r.signer.Sign(ctxWithTimeout, r.csaPubKey, msgBytes)
signature, err := r.signer.Sign(ctxWithTimeout, fmt.Sprintf("%x", r.csaPubKey), msgBytes)
if err != nil {
return nil, fmt.Errorf("beholder: failed to sign auth header: %w", err)
}
Expand Down
63 changes: 50 additions & 13 deletions pkg/beholder/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ed25519"
"encoding/hex"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -54,7 +55,7 @@ type MockSigner struct {
mock.Mock
}

func (m *MockSigner) Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error) {
func (m *MockSigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) {
args := m.Called(ctx, keyID, data)
return args.Get(0).([]byte), args.Error(1)
}
Expand All @@ -71,13 +72,13 @@ func TestRotatingAuth(t *testing.T) {
dummySignature := ed25519.Sign(privKey, []byte("test data"))

mockSigner.
On("Sign", mock.Anything, mock.MatchedBy(func(keyID []byte) bool {
return string(keyID) == string(pubKey) // Verify correct public key is passed
On("Sign", mock.Anything, mock.MatchedBy(func(keyID string) bool {
return keyID == hex.EncodeToString(pubKey) // Verify correct public key hex is passed
}), mock.Anything).
Return(dummySignature, nil)

ttl := 5 * time.Minute
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

headers, err := auth.Headers(t.Context())
require.NoError(t, err)
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestRotatingAuth(t *testing.T) {
Maybe()

ttl := 5 * time.Minute
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

headers1, err := auth.Headers(t.Context())
require.NoError(t, err)
Expand All @@ -135,7 +136,7 @@ func TestRotatingAuth(t *testing.T) {
Return([]byte{}, expectedErr)

ttl := 5 * time.Minute
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

headers, err := auth.Headers(t.Context())
require.Error(t, err)
Expand All @@ -157,7 +158,7 @@ func TestRotatingAuth(t *testing.T) {
Maybe()

ttl := 5 * time.Minute
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

creds := auth.Credentials()
require.NotNil(t, creds)
Expand All @@ -183,16 +184,52 @@ func TestRotatingAuth(t *testing.T) {

ttl := 5 * time.Minute
// transport security required
authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true)
authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true, nil)
credsSecure := authSecure.Credentials()
assert.True(t, credsSecure.RequireTransportSecurity())
// transport security not required
authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
credsInsecure := authInsecure.Credentials()
assert.False(t, credsInsecure.RequireTransportSecurity())

mockSigner.AssertExpectations(t)
})

t.Run("uses initial headers until TTL expires", func(t *testing.T) {
mockSigner := &MockSigner{}

// Create initial headers with v2 format
ts := time.Now()
signature := ed25519.Sign(privKey, []byte("initial"))
initialHeaders := map[string]string{
"X-Beholder-Node-Auth-Token": "2:" + hex.EncodeToString(pubKey) + ":" + fmt.Sprintf("%d", ts.UnixNano()) + ":" + hex.EncodeToString(signature),
}

// Use a very short TTL so it expires quickly
ttl := 1 * time.Millisecond
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, initialHeaders)

// First call should return the initial headers without calling Sign
headers1, err := auth.Headers(t.Context())
require.NoError(t, err)
assert.Equal(t, initialHeaders, headers1)

// Wait for TTL to expire
time.Sleep(5 * time.Millisecond)

// Now the signer should be called to generate new headers
newSignature := ed25519.Sign(privKey, []byte("new"))
mockSigner.
On("Sign", mock.Anything, mock.Anything, mock.Anything).
Return(newSignature, nil).
Once()

headers2, err := auth.Headers(t.Context())
require.NoError(t, err)
assert.NotEqual(t, initialHeaders, headers2, "Should generate new headers after TTL expires")

mockSigner.AssertExpectations(t)
})
}

// BenchmarkRotatingAuth_Headers_CachedPath benchmarks the fast path where headers are cached and within TTL.
Expand All @@ -212,7 +249,7 @@ func BenchmarkRotatingAuth_Headers_CachedPath(b *testing.B) {

// Use a long TTL so headers don't expire during the benchmark
ttl := 1 * time.Hour
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

// Prime the cache by calling Headers once
ctx := b.Context()
Expand Down Expand Up @@ -249,7 +286,7 @@ func BenchmarkRotatingAuth_Headers_ExpiredPath(b *testing.B) {

// Use a TTL of 0 to force regeneration on every call
ttl := 0 * time.Second
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

ctx := b.Context()

Expand Down Expand Up @@ -283,7 +320,7 @@ func BenchmarkRotatingAuth_Headers_ParallelCached(b *testing.B) {

// Use a long TTL so headers don't expire during the benchmark
ttl := 1 * time.Hour
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

// Prime the cache
ctx := b.Context()
Expand Down Expand Up @@ -323,7 +360,7 @@ func BenchmarkRotatingAuth_Headers_ParallelExpired(b *testing.B) {

// Use a short TTL to cause periodic regeneration
ttl := 10 * time.Millisecond
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)

ctx := b.Context()

Expand Down
40 changes: 36 additions & 4 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Client struct {
MeterProvider otelmetric.MeterProvider
MessageLoggerProvider otellog.LoggerProvider

// LazyKeystoreSigner is the reference to the lazy signer if one was configured
// This allows updating the keystore after client initialization
lazySigner LazySigner

// OnClose
OnClose func() error
}
Expand Down Expand Up @@ -100,6 +104,11 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint),
}
// Initialize auth here for reuse with log, trace, and metric exporters
// Two modes are supported:
// 1. Rotating auth: If AuthKeySigner is set, AuthHeaders are used as initial headers
// that will be rotated by the lazy signer after TTL expires
// 2. Static auth: If AuthKeySigner is nil, AuthHeaders are used as-is and never change
lazySigner := NewLazySigner()
var auth Auth
if cfg.AuthKeySigner != nil {

Expand All @@ -116,14 +125,16 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
return nil, fmt.Errorf("auth: failed to decode public key hex: %w", err)
}

auth = NewRotatingAuth(key, cfg.AuthKeySigner, cfg.AuthHeadersTTL, !cfg.InsecureConnection)
// Rotating mode: wrap the signer and use AuthHeaders as initial headers
lazySigner.SetSigner(cfg.AuthKeySigner)
auth = NewRotatingAuth(key, lazySigner, cfg.AuthHeadersTTL, !cfg.InsecureConnection, cfg.AuthHeaders)
}
// Log exporter auth
switch {
// Rotating auth
// Rotating auth mode
case auth != nil:
opts = append(opts, otlploggrpc.WithDialOption(authDialOpt(auth)))
// Static auth
// Static auth mode
case len(cfg.AuthHeaders) > 0:
opts = append(opts, otlploggrpc.WithHeaders(cfg.AuthHeaders))
// No auth
Expand Down Expand Up @@ -307,7 +318,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
}
return
}
return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, lazySigner, onClose}, nil
}

// Closes all providers, flushes all data and stops all background processes
Expand Down Expand Up @@ -346,6 +357,27 @@ func (c Client) ForName(name string) Client {
return newClient
}

// SetSigner updates the signer in the lazy signer if one was configured during client initialization.
// This method enables setting the signer after the beholder client has been created, which is useful
// when the signer is not available at client initialization time but the client needs to be configured
// with rotating auth.
func (c *Client) SetSigner(signer Signer) error {
if c.lazySigner == nil {
return fmt.Errorf("no lazy signer configured - client was not initialized with a LazySigner")
}
c.lazySigner.SetSigner(signer)
return nil
}

// HasSigner returns true if a signer has been set in the lazy signer.
// Returns false if no lazy signer was configured or if the keystore has not been set yet.
func (c *Client) HasSigner() bool {
if c.lazySigner == nil {
return false
}
return c.lazySigner.HasSigner()
}

func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) {
extraResources, err := sdkresource.New(
context.Background(),
Expand Down
10 changes: 7 additions & 3 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ type Config struct {

// Auth
AuthPublicKeyHex string
AuthHeaders map[string]string
AuthKeySigner Signer
AuthHeadersTTL time.Duration
// AuthHeaders serves two purposes:
// 1. Static mode: When AuthKeySigner is nil, these headers are used as-is and never change
// 2. Rotating mode: When AuthKeySigner is set, these headers are used as initial headers
// until TTL expires, then the lazy signer generates new ones
AuthHeaders map[string]string
AuthKeySigner Signer
AuthHeadersTTL time.Duration
}

type RetryConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/beholder/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
}
return
}
return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
// HTTP client doesn't currently support rotating auth, so lazySigner is always nil
return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose}, nil
}

func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) {
Expand Down
52 changes: 52 additions & 0 deletions pkg/beholder/lazy_signer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package beholder

import (
"context"
"fmt"
"sync"
)

type LazySigner interface {
Sign(ctx context.Context, keyID string, data []byte) ([]byte, error)
SetSigner(signer Signer)
HasSigner() bool
}

// lazyKeystoreSigner is a thread-safe wrapper that allows the keystore
// to be set after the signer is created. This enables beholder to start
// with rotating auth configured, but the actual keystore can be injected later.
type lazySigner struct {
mu sync.RWMutex
Signer
}

func NewLazySigner() LazySigner {
return &lazySigner{mu: sync.RWMutex{}, Signer: nil}
}

// Sign implements the beholder.Signer interface
func (l *lazySigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()

if l.Signer == nil {
return nil, fmt.Errorf("keystore not yet available for signing")
}

return l.Signer.Sign(ctx, keyID, data)
}

// SetKeystore updates the underlying keystore. This is thread-safe and can be
// called at any time, even after beholder has been initialized.
func (l *lazySigner) SetSigner(signer Signer) {
l.mu.Lock()
defer l.mu.Unlock()
l.Signer = signer
}

// HasKeystore returns true if a keystore has been set
func (l *lazySigner) HasSigner() bool {
l.mu.RLock()
defer l.mu.RUnlock()
return l.Signer != nil
}
4 changes: 2 additions & 2 deletions pkg/beholder/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewNoopClient() *Client {
// MessageEmitter
messageEmitter := noopMessageEmitter{}

return &Client{cfg, logger, tracer, meter, messageEmitter, nil, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose}
return &Client{cfg, logger, tracer, meter, messageEmitter, nil, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose}
}

// NewStdoutClient creates a new Client with exporters which send telemetry data to standard output
Expand Down Expand Up @@ -94,7 +94,7 @@ func NewWriterClient(w io.Writer) (*Client, error) {
return
}

return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, OnClose: onClose}, nil
return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, Chip: nil, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, lazySigner: nil, OnClose: onClose}, nil
}

type noopMessageEmitter struct{}
Expand Down
Loading
Loading