Skip to content

Commit 86b99a6

Browse files
committed
add comment and more kafka cases.
1 parent 739acb2 commit 86b99a6

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

apps/guardrails-service/container/src/pkg/kafka/consumer.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"crypto/x509"
77
"encoding/json"
88
"errors"
9+
"io"
910
"os"
1011
"os/signal"
1112
"regexp"
@@ -255,11 +256,21 @@ func (c *Consumer) Start(ctx context.Context) error {
255256
readCancel()
256257

257258
if err != nil {
259+
// Check if parent context was cancelled first
260+
if ctx.Err() != nil {
261+
return nil
262+
}
263+
// Timeout is expected when no messages available, continue silently
258264
if errors.Is(err, context.DeadlineExceeded) {
259265
// Timeout is expected when no messages available, continue silently
260266
continue
261267
}
262268
if errors.Is(err, context.Canceled) {
269+
// Context was cancelled, exit gracefully
270+
return nil
271+
}
272+
// kafka-go returns io.EOF when reader is closed or partition is exhausted
273+
if errors.Is(err, io.EOF) {
263274
return nil
264275
}
265276
c.logger.Error("Failed to read message from Kafka", zap.Error(err))

apps/guardrails-service/container/src/pkg/validator/service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func (s *Service) getCachedPolicies() ([]types.Policy, map[string]*types.AuditPo
7979
auditPolicies := s.cache.auditPolicies
8080
compiledRules := s.cache.compiledRules
8181
hasAuditRules := s.cache.hasAuditRules
82+
// RUnlock so that multiple goroutines can read the cached policies
8283
s.cache.mu.RUnlock()
8384
s.logger.Debug("Using cached policies",
8485
zap.Time("lastFetched", s.cache.lastFetched),

0 commit comments

Comments
 (0)