diff --git a/pkg/beholder/auth.go b/pkg/beholder/auth.go index 640d0fbe2..b5ada98d8 100644 --- a/pkg/beholder/auth.go +++ b/pkg/beholder/auth.go @@ -38,7 +38,7 @@ type Auth interface { } type Signer interface { - Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error) + Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) } type staticAuth struct { @@ -82,7 +82,10 @@ type rotatingAuth struct { mu sync.Mutex } -func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool) Auth { +// NewRotatingAuth creates a rotating auth mechanism that automatically refreshes headers. +// If initialHeaders are provided, they will be used immediately until TTL expires. +// After TTL expiration, the signer is called to generate new headers. +func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool, initialHeaders map[string]string) Auth { r := &rotatingAuth{ csaPubKey: csaPubKey, signer: signer, @@ -91,7 +94,18 @@ func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Durati lastUpdatedNanos: atomic.Int64{}, requireTransportSecurity: requireTransportSecurity, } - r.headers.Store(make(map[string]string)) + + headers := make(map[string]string) + // If initial headers are provided, use them and set timestamp to now + // Otherwise, leave timestamp at 0 so headers are generated on first call + if len(initialHeaders) > 0 { + headers = initialHeaders + // We assume the time between the initial headers being generated is very small + r.lastUpdatedNanos.Store(time.Now().UnixNano()) + } + + r.headers.Store(headers) + return r } @@ -125,7 +139,7 @@ func (r *rotatingAuth) Headers(ctx context.Context) (map[string]string, error) { defer cancel() // Sign(public key bytes + timestamp bytes) - signature, err := r.signer.Sign(ctxWithTimeout, r.csaPubKey, msgBytes) + signature, err := r.signer.Sign(ctxWithTimeout, fmt.Sprintf("%x", r.csaPubKey), msgBytes) if err != nil { return nil, fmt.Errorf("beholder: failed to sign auth header: %w", err) } diff --git a/pkg/beholder/auth_test.go b/pkg/beholder/auth_test.go index a357a716c..e769b3921 100644 --- a/pkg/beholder/auth_test.go +++ b/pkg/beholder/auth_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ed25519" "encoding/hex" + "fmt" "strings" "testing" "time" @@ -54,7 +55,7 @@ type MockSigner struct { mock.Mock } -func (m *MockSigner) Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error) { +func (m *MockSigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) { args := m.Called(ctx, keyID, data) return args.Get(0).([]byte), args.Error(1) } @@ -71,13 +72,13 @@ func TestRotatingAuth(t *testing.T) { dummySignature := ed25519.Sign(privKey, []byte("test data")) mockSigner. - On("Sign", mock.Anything, mock.MatchedBy(func(keyID []byte) bool { - return string(keyID) == string(pubKey) // Verify correct public key is passed + On("Sign", mock.Anything, mock.MatchedBy(func(keyID string) bool { + return keyID == hex.EncodeToString(pubKey) // Verify correct public key hex is passed }), mock.Anything). Return(dummySignature, nil) ttl := 5 * time.Minute - auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) headers, err := auth.Headers(t.Context()) require.NoError(t, err) @@ -112,7 +113,7 @@ func TestRotatingAuth(t *testing.T) { Maybe() ttl := 5 * time.Minute - auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) headers1, err := auth.Headers(t.Context()) require.NoError(t, err) @@ -135,7 +136,7 @@ func TestRotatingAuth(t *testing.T) { Return([]byte{}, expectedErr) ttl := 5 * time.Minute - auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) headers, err := auth.Headers(t.Context()) require.Error(t, err) @@ -157,7 +158,7 @@ func TestRotatingAuth(t *testing.T) { Maybe() ttl := 5 * time.Minute - auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) creds := auth.Credentials() require.NotNil(t, creds) @@ -183,16 +184,52 @@ func TestRotatingAuth(t *testing.T) { ttl := 5 * time.Minute // transport security required - authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true) + authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true, nil) credsSecure := authSecure.Credentials() assert.True(t, credsSecure.RequireTransportSecurity()) // transport security not required - authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) credsInsecure := authInsecure.Credentials() assert.False(t, credsInsecure.RequireTransportSecurity()) mockSigner.AssertExpectations(t) }) + + t.Run("uses initial headers until TTL expires", func(t *testing.T) { + mockSigner := &MockSigner{} + + // Create initial headers with v2 format + ts := time.Now() + signature := ed25519.Sign(privKey, []byte("initial")) + initialHeaders := map[string]string{ + "X-Beholder-Node-Auth-Token": "2:" + hex.EncodeToString(pubKey) + ":" + fmt.Sprintf("%d", ts.UnixNano()) + ":" + hex.EncodeToString(signature), + } + + // Use a very short TTL so it expires quickly + ttl := 1 * time.Millisecond + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, initialHeaders) + + // First call should return the initial headers without calling Sign + headers1, err := auth.Headers(t.Context()) + require.NoError(t, err) + assert.Equal(t, initialHeaders, headers1) + + // Wait for TTL to expire + time.Sleep(5 * time.Millisecond) + + // Now the signer should be called to generate new headers + newSignature := ed25519.Sign(privKey, []byte("new")) + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return(newSignature, nil). + Once() + + headers2, err := auth.Headers(t.Context()) + require.NoError(t, err) + assert.NotEqual(t, initialHeaders, headers2, "Should generate new headers after TTL expires") + + mockSigner.AssertExpectations(t) + }) } // BenchmarkRotatingAuth_Headers_CachedPath benchmarks the fast path where headers are cached and within TTL. @@ -212,7 +249,7 @@ func BenchmarkRotatingAuth_Headers_CachedPath(b *testing.B) { // Use a long TTL so headers don't expire during the benchmark ttl := 1 * time.Hour - auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) // Prime the cache by calling Headers once ctx := b.Context() @@ -249,7 +286,7 @@ func BenchmarkRotatingAuth_Headers_ExpiredPath(b *testing.B) { // Use a TTL of 0 to force regeneration on every call ttl := 0 * time.Second - auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) ctx := b.Context() @@ -283,7 +320,7 @@ func BenchmarkRotatingAuth_Headers_ParallelCached(b *testing.B) { // Use a long TTL so headers don't expire during the benchmark ttl := 1 * time.Hour - auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) // Prime the cache ctx := b.Context() @@ -323,7 +360,7 @@ func BenchmarkRotatingAuth_Headers_ParallelExpired(b *testing.B) { // Use a short TTL to cause periodic regeneration ttl := 10 * time.Millisecond - auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil) ctx := b.Context() diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 85973f48b..460f8d7c4 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -50,6 +50,9 @@ type Client struct { MeterProvider otelmetric.MeterProvider MessageLoggerProvider otellog.LoggerProvider + // lazySigner allows updating the keystore after client initialization. + lazySigner *lazySigner + // OnClose OnClose func() error } @@ -98,12 +101,18 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro } // Initialize auth here for reuse with log, trace, and metric exporters + // Two modes are supported: + // 1. Static auth: If AuthHeadersTTL == 0, use AuthHeaders as-is and never change + // 2. Rotating auth: If AuthHeadersTTL > 0, create lazySigner for deferred keystore injection + var signer *lazySigner var auth Auth - if cfg.AuthKeySigner != nil { + + if cfg.AuthHeadersTTL > 0 { if cfg.AuthPublicKeyHex == "" { - return nil, fmt.Errorf("auth: public key hex required when signer is set") + return nil, fmt.Errorf("auth: public key hex required for rotating auth (TTL > 0)") } + // Clamp lowest possible value to 10mins if cfg.AuthHeadersTTL < 10*time.Minute { return nil, fmt.Errorf("auth: headers TTL must be at least 10 minutes") @@ -114,8 +123,16 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro return nil, fmt.Errorf("auth: failed to decode public key hex: %w", err) } - auth = NewRotatingAuth(key, cfg.AuthKeySigner, cfg.AuthHeadersTTL, !cfg.InsecureConnection) + // Optionally wrap the signer in a lazySigner if AuthKeySigner was provided + // This allows the signer to be set both before and after client initialization + signer = &lazySigner{} + if cfg.AuthKeySigner != nil { + signer.Set(cfg.AuthKeySigner) + } + + auth = NewRotatingAuth(key, signer, cfg.AuthHeadersTTL, !cfg.InsecureConnection, cfg.AuthHeaders) } + // Tracer tracerProvider, err := newTracerProvider(cfg, baseResource, auth, creds) if err != nil { @@ -223,7 +240,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro } return } - return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil + return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil } // Closes all providers, flushes all data and stops all background processes @@ -262,6 +279,21 @@ func (c Client) ForName(name string) Client { return newClient } +// SetSigner updates the signer in the lazy signer. +// This method enables setting the signer after the beholder client has been created, which is useful +// when the signer is not available at client initialization time but the client needs to be configured +// with rotating auth. The underlying lazy signer is thread-safe. +func (c *Client) SetSigner(signer Signer) { + if c.lazySigner != nil { + c.lazySigner.Set(signer) + } +} + +// IsSignerSet returns true if a signer has been set in the lazy signer. +func (c *Client) IsSignerSet() bool { + return c.lazySigner != nil && c.lazySigner.IsSet() +} + func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) { extraResources, err := sdkresource.New( context.Background(), diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go index 62095d328..71e32e6c7 100644 --- a/pkg/beholder/client_test.go +++ b/pkg/beholder/client_test.go @@ -559,15 +559,12 @@ func TestNewGRPCClientRotatingAuth(t *testing.T) { require.NotNil(t, client) }) - t.Run("error when public key hex is empty but signer is set", func(t *testing.T) { - - mockSigner := &MockSigner{} + t.Run("error when public key hex is empty but TTL is set", func(t *testing.T) { cfg := beholder.Config{ OtelExporterGRPCEndpoint: "localhost:4317", - AuthPublicKeyHex: "", // Empty public key hex - AuthKeySigner: mockSigner, - AuthHeadersTTL: 10 * time.Minute, + AuthPublicKeyHex: "", // Empty public key hex + AuthHeadersTTL: 10 * time.Minute, // TTL > 0 requires public key InsecureConnection: true, } @@ -578,7 +575,7 @@ func TestNewGRPCClientRotatingAuth(t *testing.T) { client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) require.Error(t, err) assert.Nil(t, client) - assert.Contains(t, err.Error(), "auth: public key hex required when signer is set") + assert.Contains(t, err.Error(), "auth: public key hex required for rotating auth") }) t.Run("error when TTL is too short", func(t *testing.T) { diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index b9d49d2f5..a6d501f81 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -52,10 +52,14 @@ type Config struct { LogLevel zapcore.Level // Log level for telemetry streaming // Auth - AuthPublicKeyHex string + // AuthHeaders serves two purposes: + // 1. Static mode: When AuthKeySigner is nil, these headers are used as-is and never change + // 2. Rotating mode: When AuthKeySigner is set, these headers are used as initial headers + // until TTL expires, then the lazy signer generates new ones AuthHeaders map[string]string - AuthKeySigner Signer AuthHeadersTTL time.Duration + AuthKeySigner Signer + AuthPublicKeyHex string } type RetryConfig struct { @@ -120,8 +124,8 @@ func DefaultConfig() Config { LogBatchProcessor: true, LogStreamingEnabled: true, // Enable logs streaming by default LogLevel: zapcore.InfoLevel, - // Auth - AuthHeadersTTL: 10 * time.Minute, + // Auth (defaults to static auth mode with TTL=0) + AuthHeadersTTL: 0, } } @@ -134,6 +138,8 @@ func TestDefaultConfig() Config { config.LogRetryConfig.MaxElapsedTime = 0 // Retry is disabled config.TraceRetryConfig.MaxElapsedTime = 0 // Retry is disabled config.MetricRetryConfig.MaxElapsedTime = 0 // Retry is disabled + // Auth disabled for testing (TTL=0 means static auth mode) + config.AuthHeadersTTL = 0 return config } @@ -144,6 +150,8 @@ func TestDefaultConfigHTTPClient() Config { config.LogBatchProcessor = false config.OtelExporterGRPCEndpoint = "" config.OtelExporterHTTPEndpoint = "localhost:4318" + // Auth disabled for testing (TTL=0 means static auth mode) + config.AuthHeadersTTL = 0 return config } diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index 039e1aad1..4e9d7ab3f 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -64,6 +64,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info AuthPublicKeyHex: AuthHeaders:map[] AuthKeySigner: AuthHeadersTTL:0s} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 15b98faac..693f58145 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -186,7 +186,8 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro } return } - return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil + // HTTP client doesn't currently support rotating auth, so lazySigner is always nil + return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose}, nil } func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) { diff --git a/pkg/beholder/lazy_signer.go b/pkg/beholder/lazy_signer.go new file mode 100644 index 000000000..0556bd787 --- /dev/null +++ b/pkg/beholder/lazy_signer.go @@ -0,0 +1,35 @@ +package beholder + +import ( + "context" + "fmt" + "sync/atomic" +) + +// lazySigner is a thread-safe wrapper that allows the keystore +// to be set after the signer is created. This enables beholder to start +// with rotating auth configured, but the actual keystore can be injected later. +// The zero value is usable. +type lazySigner struct { + signer atomic.Pointer[Signer] +} + +// Sign implements the beholder.Signer interface +func (l *lazySigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) { + s := l.signer.Load() + if s == nil { + return nil, fmt.Errorf("keystore not yet available for signing") + } + return (*s).Sign(ctx, keyID, data) +} + +// Set updates the underlying keystore. This is thread-safe and can be +// called at any time, even after beholder has been initialized. +func (l *lazySigner) Set(signer Signer) { + l.signer.Store(&signer) +} + +// IsSet returns true if a keystore has been set +func (l *lazySigner) IsSet() bool { + return l.signer.Load() != nil +} diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 82e10ce84..45ea9f41e 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -35,7 +35,7 @@ func NewNoopClient() *Client { // MessageEmitter messageEmitter := noopMessageEmitter{} - return &Client{cfg, logger, tracer, meter, messageEmitter, nil, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose} + return &Client{cfg, logger, tracer, meter, messageEmitter, nil, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose} } // NewStdoutClient creates a new Client with exporters which send telemetry data to standard output @@ -94,7 +94,7 @@ func NewWriterClient(w io.Writer) (*Client, error) { return } - return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, OnClose: onClose}, nil + return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, Chip: nil, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, lazySigner: nil, OnClose: onClose}, nil } type noopMessageEmitter struct{} diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 68e395097..74848a02e 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -57,6 +57,7 @@ const ( envTelemetryTraceSampleRatio = "CL_TELEMETRY_TRACE_SAMPLE_RATIO" envTelemetryAuthHeader = "CL_TELEMETRY_AUTH_HEADER" envTelemetryAuthPubKeyHex = "CL_TELEMETRY_AUTH_PUB_KEY_HEX" + envTelemetryAuthHeadersTTL = "CL_TELEMETRY_AUTH_HEADERS_TTL" envTelemetryEmitterBatchProcessor = "CL_TELEMETRY_EMITTER_BATCH_PROCESSOR" envTelemetryEmitterExportTimeout = "CL_TELEMETRY_EMITTER_EXPORT_TIMEOUT" envTelemetryEmitterExportInterval = "CL_TELEMETRY_EMITTER_EXPORT_INTERVAL" @@ -114,6 +115,7 @@ type EnvConfig struct { TelemetryTraceSampleRatio float64 TelemetryAuthHeaders map[string]string TelemetryAuthPubKeyHex string + TelemetryAuthHeadersTTL time.Duration TelemetryEmitterBatchProcessor bool TelemetryEmitterExportTimeout time.Duration TelemetryEmitterExportInterval time.Duration @@ -184,6 +186,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envTelemetryAuthHeader+k, v) } add(envTelemetryAuthPubKeyHex, e.TelemetryAuthPubKeyHex) + add(envTelemetryAuthHeadersTTL, e.TelemetryAuthHeadersTTL.String()) add(envTelemetryEmitterBatchProcessor, strconv.FormatBool(e.TelemetryEmitterBatchProcessor)) add(envTelemetryEmitterExportTimeout, e.TelemetryEmitterExportTimeout.String()) add(envTelemetryEmitterExportInterval, e.TelemetryEmitterExportInterval.String()) @@ -331,6 +334,10 @@ func (e *EnvConfig) parse() error { e.TelemetryTraceSampleRatio = getFloat64OrZero(envTelemetryTraceSampleRatio) e.TelemetryAuthHeaders = getMap(envTelemetryAuthHeader) e.TelemetryAuthPubKeyHex = os.Getenv(envTelemetryAuthPubKeyHex) + e.TelemetryAuthHeadersTTL, err = getDuration(envTelemetryAuthHeadersTTL) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryAuthHeadersTTL, err) + } e.TelemetryEmitterBatchProcessor, err = getBool(envTelemetryEmitterBatchProcessor) if err != nil { return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterBatchProcessor, err) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index b0c836f22..2647893d7 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -133,8 +133,6 @@ func (s *Server) start() error { OtelExporterGRPCEndpoint: s.EnvConfig.TelemetryEndpoint, ResourceAttributes: append(attributes, s.EnvConfig.TelemetryAttributes.AsStringAttributes()...), TraceSampleRatio: s.EnvConfig.TelemetryTraceSampleRatio, - AuthHeaders: s.EnvConfig.TelemetryAuthHeaders, - AuthPublicKeyHex: s.EnvConfig.TelemetryAuthPubKeyHex, EmitterBatchProcessor: s.EnvConfig.TelemetryEmitterBatchProcessor, EmitterExportTimeout: s.EnvConfig.TelemetryEmitterExportTimeout, EmitterExportInterval: s.EnvConfig.TelemetryEmitterExportInterval, @@ -147,6 +145,20 @@ func (s *Server) start() error { ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, } + // Configure beholder auth - the client will determine rotating vs static mode + // Rotating mode: when AuthHeadersTTL is set, client creates internal lazySigner + // Static mode: no TTL is provided it is assumed that the headers are static + if s.EnvConfig.TelemetryAuthHeadersTTL > 0 { + // Rotating auth mode: client will create lazySigner internally and allow keystore injection after startup + beholderCfg.AuthPublicKeyHex = s.EnvConfig.TelemetryAuthPubKeyHex + beholderCfg.AuthHeadersTTL = s.EnvConfig.TelemetryAuthHeadersTTL + beholderCfg.AuthHeaders = s.EnvConfig.TelemetryAuthHeaders // initial headers + } else { + // Static auth mode: headers and/or public key without rotation + beholderCfg.AuthHeaders = s.EnvConfig.TelemetryAuthHeaders + beholderCfg.AuthPublicKeyHex = s.EnvConfig.TelemetryAuthPubKeyHex + } + // note: due to the OTEL specification, all histogram buckets // must be defined when the beholder client is created beholderCfg.MetricViews = append(beholderCfg.MetricViews, s.otelViews...)