Skip to content

Commit d7538de

Browse files
fix(local): tolerate NATS startup races (#46)
Add auth-callout initial NATS reconnect handling and split liveness from NATS-backed health checks. The service now stays alive while NATS starts. Readiness reports unavailable until the NATS connection is established. Extend only local surveyor and auth-callout rollout waits to cover kubelet backoff during local Kind deploys. Signed-off-by: Frank Spitulski <fspitulski@nvidia.com>
1 parent 5967407 commit d7538de

8 files changed

Lines changed: 268 additions & 34 deletions

File tree

auth-callout/deploy/README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ For mTLS authentication:
2929

3030
| Port | Purpose |
3131
|------|---------|
32-
| 8080 | Health checks (`/healthz`) |
32+
| 8080 | Health checks (`/livez`, `/healthz`) |
3333
| 9090 | Prometheus metrics (if enabled) |
3434

3535
## Service Configuration
@@ -277,7 +277,7 @@ healthChecks:
277277
livenessProbe:
278278
enabled: true
279279
httpGet:
280-
path: "/healthz"
280+
path: "/livez"
281281
port: "http"
282282
scheme: "HTTP"
283283
initialDelaySeconds: 10
@@ -315,7 +315,8 @@ healthChecks:
315315

316316
**Available Health Endpoints:**
317317

318-
- `/healthz` - Primary health check endpoint (recommended)
318+
- `/livez` - Process liveness check endpoint
319+
- `/healthz` - Health check endpoint, including NATS connectivity
319320
- `/v1/` - Basic connectivity test
320321

321322
## Installation Examples

auth-callout/deploy/values.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ healthChecks:
147147
livenessProbe:
148148
enabled: true # Enable liveness probe
149149
httpGet:
150-
path: "/healthz" # Health check endpoint path
150+
path: "/livez" # Liveness endpoint path
151151
port: "http" # Port name or number
152152
scheme: "HTTP" # HTTP or HTTPS
153153
initialDelaySeconds: 10 # Initial delay before starting probes
@@ -160,7 +160,7 @@ healthChecks:
160160
readinessProbe:
161161
enabled: true # Enable readiness probe
162162
httpGet:
163-
path: "/healthz" # Health check endpoint path
163+
path: "/healthz" # Health endpoint path, including NATS connectivity
164164
port: "http" # Port name or number
165165
scheme: "HTTP" # HTTP or HTTPS
166166
initialDelaySeconds: 5 # Initial delay before starting probes

auth-callout/docs/basic-profile.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ graph TB
122122
5. **Production Images**: Uses optimized production container images
123123

124124
### Available Endpoints
125-
1. **Health Check**: `GET /healthz` - Service health status
126-
2. **API Ping**: `GET /v1/` - API connectivity test
127-
3. **API Documentation**: `/swagger/index.html` - Interactive API documentation
125+
1. **Liveness Check**: `GET /livez` - Process liveness status
126+
2. **Health Check**: `GET /healthz` - Service health status, including NATS connectivity
127+
3. **API Ping**: `GET /v1/` - API connectivity test
128+
4. **API Documentation**: `/swagger/index.html` - Interactive API documentation
128129

129130
### Development Benefits
130131
1. **Minimal Overhead**: No observability components consuming resources

auth-callout/src/cmd/auth_callout/main_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ func TestRunServerDoesNotLogConfiguredNATSSeeds(t *testing.T) {
2626
t.Setenv("AUTH_CALLOUT_NATS_NKEY_SEED", natsKey)
2727
t.Setenv("AUTH_CALLOUT_NATS_ISSUER_SEED", issuerKey)
2828
t.Setenv("AUTH_CALLOUT_NATS_XKEY_SEED", xKey)
29-
t.Setenv("AUTH_CALLOUT_NATS_URL", "nats://127.0.0.1:1")
29+
// Use malformed syntax so startup fails synchronously; valid unavailable URLs now reconnect.
30+
t.Setenv("AUTH_CALLOUT_NATS_URL", "nats://%zz")
3031
t.Setenv("AUTH_CALLOUT_PERMISSIONS_FILE", permissionsFile)
3132
t.Setenv("AUTH_CALLOUT_OBSERVABILITY_METRICS_ENABLED", "false")
3233
t.Setenv("AUTH_CALLOUT_OBSERVABILITY_TRACING_ENABLED", "false")

auth-callout/src/internal/service/health.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@ import (
77
"net/http"
88
)
99

10+
func livezHandler(w http.ResponseWriter, _ *http.Request) {
11+
_, _ = w.Write([]byte("OK"))
12+
}
13+
1014
// HealthHandler handles health check requests at the "/healthz" endpoint.
11-
// Returns HTTP 200 OK with "OK" body if the service is healthy.
12-
func (s *Service) HealthHandler(w http.ResponseWriter, r *http.Request) {
15+
func (s *Service) HealthHandler(w http.ResponseWriter, _ *http.Request) {
1316
// Check NATS connection health
1417
if s.natsConn != nil && !s.natsConn.IsConnected() {
1518
http.Error(w, "NATS connection lost", http.StatusServiceUnavailable)
1619
return
1720
}
1821

19-
w.WriteHeader(http.StatusOK)
20-
_, err := w.Write([]byte("OK"))
21-
if err != nil {
22-
http.Error(w, err.Error(), http.StatusInternalServerError)
23-
return
24-
}
22+
_, _ = w.Write([]byte("OK"))
2523
}

auth-callout/src/internal/service/service.go

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ import (
3131
obstracing "github.com/NVIDIA/dsx-exchange/auth-callout/src/internal/observability/tracing"
3232
)
3333

34+
const (
35+
natsClientName = "auth-callout-service"
36+
natsStartupConnectWait = 2 * time.Second
37+
natsMaxReconnects = -1 // Keep retrying while readiness reports NATS unavailable.
38+
)
39+
3440
// Service wraps HTTP server with graceful shutdown
3541
type Service struct {
3642
config ServiceConfig
@@ -191,31 +197,37 @@ func New(config ServiceConfig, logger *otelzap.Logger) *Service {
191197
// The service will wait for interrupt signals and perform a graceful shutdown with a 5-second timeout.
192198
func (s *Service) Run() error {
193199
// =================================================
194-
// Put your unauthenticated routes here (eg healthz)
200+
// Put unauthenticated health routes here.
195201
// =================================================
202+
s.rootRouter.HandleFunc("/livez", livezHandler)
196203
s.rootRouter.HandleFunc("/healthz", s.HealthHandler)
197204

198-
// Connect to NATS
199-
var opts []nats.Option
200-
if s.config.NATS.NKeySeed != "" {
201-
kp, err := nkeys.FromSeed([]byte(s.config.NATS.NKeySeed))
202-
if err != nil {
203-
return fmt.Errorf("error loading NATS NKey: %w", err)
204-
}
205-
pubKey, err := kp.PublicKey()
206-
if err != nil {
207-
return fmt.Errorf("error getting NATS NKey public key: %w", err)
208-
}
209-
opts = append(opts, nats.Nkey(pubKey, kp.Sign))
205+
initialConnectCh := make(chan struct{}, 1)
206+
opts, err := s.buildNATSOptions(initialConnectCh)
207+
if err != nil {
208+
return err
210209
}
211-
opts = append(opts, nats.Name("auth-callout-service"))
212210

213211
nc, err := nats.Connect(s.config.NATS.URL, opts...)
214212
if err != nil {
215213
return fmt.Errorf("error connecting to NATS: %w", err)
216214
}
217215
s.natsConn = nc
218-
s.logger.Info("Connected to NATS", zap.String("url", s.config.NATS.URL))
216+
if !nc.IsConnected() {
217+
select {
218+
case <-initialConnectCh:
219+
case <-time.After(natsStartupConnectWait):
220+
}
221+
}
222+
if nc.IsConnected() {
223+
s.logger.Info("Connected to NATS", zap.String("url", s.config.NATS.URL))
224+
} else {
225+
s.logger.Warn(
226+
"NATS still connecting after startup check",
227+
zap.String("url", s.config.NATS.URL),
228+
zap.Duration("timeout", natsStartupConnectWait),
229+
)
230+
}
219231

220232
// Create authorization handler
221233
authorizerFn := func(req *jwt.AuthorizationRequest) (string, error) {
@@ -294,6 +306,46 @@ func (s *Service) Run() error {
294306
return nil
295307
}
296308

309+
func (s *Service) buildNATSOptions(initialConnectCh chan<- struct{}) ([]nats.Option, error) {
310+
opts := []nats.Option{
311+
nats.Name(natsClientName),
312+
nats.RetryOnFailedConnect(true),
313+
nats.MaxReconnects(natsMaxReconnects),
314+
nats.ConnectHandler(func(_ *nats.Conn) {
315+
select {
316+
case initialConnectCh <- struct{}{}:
317+
default:
318+
}
319+
}),
320+
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
321+
if err != nil {
322+
s.logger.Warn("NATS disconnected", zap.String("url", s.config.NATS.URL), zap.Error(err))
323+
}
324+
}),
325+
nats.ReconnectHandler(func(_ *nats.Conn) {
326+
s.logger.Info("Connected to NATS", zap.String("url", s.config.NATS.URL))
327+
}),
328+
nats.ReconnectErrHandler(func(_ *nats.Conn, err error) {
329+
s.logger.Debug("NATS reconnect failed", zap.String("url", s.config.NATS.URL), zap.Error(err))
330+
}),
331+
}
332+
333+
if s.config.NATS.NKeySeed == "" {
334+
return opts, nil
335+
}
336+
337+
kp, err := nkeys.FromSeed([]byte(s.config.NATS.NKeySeed))
338+
if err != nil {
339+
return nil, fmt.Errorf("error loading NATS NKey: %w", err)
340+
}
341+
pubKey, err := kp.PublicKey()
342+
if err != nil {
343+
return nil, fmt.Errorf("error getting NATS NKey public key: %w", err)
344+
}
345+
346+
return append(opts, nats.Nkey(pubKey, kp.Sign)), nil
347+
}
348+
297349
// handleAuthRequest processes NATS authorization requests
298350
func (s *Service) handleAuthRequest(ctx context.Context, req *jwt.AuthorizationRequest) (string, error) {
299351
ctx, endTrace := obstracing.LevelInfo(ctx)
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Copyright 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package service
5+
6+
import (
7+
"errors"
8+
"net"
9+
"net/http"
10+
"net/http/httptest"
11+
"testing"
12+
"time"
13+
14+
"github.com/nats-io/jwt/v2"
15+
natsserver "github.com/nats-io/nats-server/v2/server"
16+
"github.com/nats-io/nats.go"
17+
"github.com/synadia-io/callout.go"
18+
"github.com/uptrace/opentelemetry-go-extra/otelzap"
19+
"go.uber.org/zap"
20+
)
21+
22+
func TestAuthorizationServiceStartsBeforeInitialNATSConnect(t *testing.T) {
23+
natsServer := runNATSServer(t)
24+
dialer := &gatedDialer{ready: make(chan struct{})}
25+
initialConnectCh := make(chan struct{}, 1)
26+
27+
opts, err := newTestService().buildNATSOptions(initialConnectCh)
28+
if err != nil {
29+
t.Fatalf("build NATS options: %v", err)
30+
}
31+
opts = append(opts,
32+
nats.SetCustomDialer(dialer),
33+
nats.Timeout(50*time.Millisecond),
34+
nats.ReconnectWait(50*time.Millisecond),
35+
)
36+
37+
natsConn, err := nats.Connect(natsServer.ClientURL(), opts...)
38+
if err != nil {
39+
t.Fatalf("connect while NATS dial is blocked: %v", err)
40+
}
41+
t.Cleanup(natsConn.Close)
42+
if natsConn.IsConnected() {
43+
t.Fatal("NATS connection is connected, want reconnecting")
44+
}
45+
authService, err := callout.NewAuthorizationService(
46+
natsConn,
47+
callout.Name("auth-callout"),
48+
callout.Authorizer(func(*jwt.AuthorizationRequest) (string, error) {
49+
return "", nil
50+
}),
51+
callout.ResponseSigner(func(*jwt.AuthorizationResponseClaims) (string, error) {
52+
return "", nil
53+
}),
54+
callout.Logger(noopNATSLogger{}),
55+
)
56+
if err != nil {
57+
t.Fatalf("create authorization service while NATS reconnects: %v", err)
58+
}
59+
t.Cleanup(func() { _ = authService.Stop() })
60+
61+
if natsConn.NumSubscriptions() == 0 {
62+
t.Fatal("authorization service did not register NATS subscriptions")
63+
}
64+
65+
close(dialer.ready)
66+
select {
67+
case <-initialConnectCh:
68+
case <-time.After(2 * time.Second):
69+
t.Fatal("NATS connection did not recover after dialer was unblocked")
70+
}
71+
if !natsConn.IsConnected() {
72+
t.Fatal("NATS connection is not connected after initial connect signal")
73+
}
74+
}
75+
76+
func TestLivezHandlerDoesNotRequireNATSConnection(t *testing.T) {
77+
response := httptest.NewRecorder()
78+
request := httptest.NewRequest(http.MethodGet, "/livez", nil)
79+
80+
livezHandler(response, request)
81+
82+
if response.Code != http.StatusOK {
83+
t.Fatalf("status = %d, want %d", response.Code, http.StatusOK)
84+
}
85+
if body := response.Body.String(); body != "OK" {
86+
t.Fatalf("body = %q, want %q", body, "OK")
87+
}
88+
}
89+
90+
func TestHealthHandlerRequiresNATSConnection(t *testing.T) {
91+
natsConn := newReconnectingNATSConn(t)
92+
93+
service := &Service{natsConn: natsConn}
94+
response := httptest.NewRecorder()
95+
request := httptest.NewRequest(http.MethodGet, "/healthz", nil)
96+
97+
service.HealthHandler(response, request)
98+
99+
if response.Code != http.StatusServiceUnavailable {
100+
t.Fatalf("status = %d, want %d", response.Code, http.StatusServiceUnavailable)
101+
}
102+
}
103+
104+
func newReconnectingNATSConn(t *testing.T) *nats.Conn {
105+
t.Helper()
106+
107+
opts, err := newTestService().buildNATSOptions(make(chan struct{}, 1))
108+
if err != nil {
109+
t.Fatalf("build NATS options: %v", err)
110+
}
111+
opts = append(opts,
112+
nats.SetCustomDialer(failingDialer{}),
113+
nats.Timeout(50*time.Millisecond),
114+
nats.ReconnectWait(50*time.Millisecond),
115+
)
116+
117+
natsConn, err := nats.Connect("nats://unavailable.test:4222", opts...)
118+
if err != nil {
119+
t.Fatalf("connect to unavailable NATS with retry enabled: %v", err)
120+
}
121+
t.Cleanup(natsConn.Close)
122+
123+
return natsConn
124+
}
125+
126+
func runNATSServer(t *testing.T) *natsserver.Server {
127+
t.Helper()
128+
129+
natsServer, err := natsserver.NewServer(&natsserver.Options{
130+
Host: "127.0.0.1",
131+
Port: -1,
132+
NoLog: true,
133+
NoSigs: true,
134+
})
135+
if err != nil {
136+
t.Fatalf("create NATS server: %v", err)
137+
}
138+
natsServer.Start()
139+
if !natsServer.ReadyForConnections(2 * time.Second) {
140+
natsServer.Shutdown()
141+
t.Fatal("NATS server did not become ready")
142+
}
143+
t.Cleanup(natsServer.Shutdown)
144+
145+
return natsServer
146+
}
147+
148+
func newTestService() *Service {
149+
return &Service{
150+
logger: otelzap.New(zap.NewNop()),
151+
}
152+
}
153+
154+
type failingDialer struct{}
155+
156+
func (failingDialer) Dial(string, string) (net.Conn, error) {
157+
return nil, errors.New("dial failed")
158+
}
159+
160+
type gatedDialer struct {
161+
ready chan struct{}
162+
}
163+
164+
func (d *gatedDialer) Dial(network, address string) (net.Conn, error) {
165+
select {
166+
case <-d.ready:
167+
return (&net.Dialer{}).Dial(network, address)
168+
default:
169+
return nil, errors.New("dial blocked")
170+
}
171+
}
172+
173+
type noopNATSLogger struct{}
174+
175+
func (noopNATSLogger) Noticef(string, ...any) {}
176+
func (noopNATSLogger) Warnf(string, ...any) {}
177+
func (noopNATSLogger) Fatalf(string, ...any) {}
178+
func (noopNATSLogger) Errorf(string, ...any) {}
179+
func (noopNATSLogger) Debugf(string, ...any) {}
180+
func (noopNATSLogger) Tracef(string, ...any) {}

0 commit comments

Comments
 (0)