Skip to content

Commit d29ca5e

Browse files
committed
feat(grpc): add healthz supporflagst
1 parent 4196eb2 commit d29ca5e

File tree

9 files changed

+244
-19
lines changed

9 files changed

+244
-19
lines changed

deploy/helm/tracee/templates/daemonset.yaml

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,48 @@ spec:
4242
valueFrom:
4343
fieldRef:
4444
fieldPath: spec.nodeName
45-
{{- if or .Values.config.server.httpAddress .Values.config.server.metrics .Values.config.server.healthz }}
45+
{{- $hasHttpServer := or .Values.config.server.httpAddress .Values.config.server.metrics .Values.config.server.healthz }}
46+
{{- $hasGrpcServer := .Values.config.server.grpcAddress }}
47+
{{- $grpcAddress := .Values.config.server.grpcAddress }}
48+
{{- $isGrpcTcp := false }}
49+
{{- $grpcPort := "" }}
50+
{{- if $hasGrpcServer }}
51+
{{- $parts := splitList ":" $grpcAddress }}
52+
{{- if eq (index $parts 0) "tcp" }}
53+
{{- $isGrpcTcp = true }}
54+
{{- if gt (len $parts) 2 }}
55+
{{- $grpcPort = index $parts 2 }}
56+
{{- else if eq (len $parts) 2 }}
57+
{{- $grpcPort = index $parts 1 }}
58+
{{- end }}
59+
{{- end }}
60+
{{- end }}
61+
{{- if or $hasHttpServer $hasGrpcServer }}
4662
ports:
63+
{{- if $hasHttpServer }}
4764
- name: metrics
4865
containerPort: {{ trimPrefix ":" (.Values.config.server.httpAddress | default ":3366") }}
4966
protocol: TCP
67+
{{- end }}
68+
{{- if $isGrpcTcp }}
69+
- name: grpc
70+
containerPort: {{ $grpcPort }}
71+
protocol: TCP
72+
{{- end }}
5073
{{- end }}
5174
securityContext:
5275
{{- toYaml .Values.securityContext | nindent 12 }}
5376
{{- if .Values.config.server.healthz }}
5477
readinessProbe:
78+
{{- if and $isGrpcTcp $grpcPort }}
79+
grpc:
80+
port: {{ $grpcPort }}
81+
service: ""
82+
{{- else if $hasHttpServer }}
5583
httpGet:
5684
path: /healthz
5785
port: {{ trimPrefix ":" (.Values.config.server.httpAddress | default ":3366") }}
86+
{{- end }}
5887
{{- end }}
5988
volumeMounts:
6089
- name: tmp-tracee

pkg/cmd/flags/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ func PrepareServer(serverSlice []string) (ServerConfig, error) {
131131
return server, err
132132
}
133133

134+
// Enable gRPC health service if healthz flag is set
135+
if server.Healthz && server.grpc != nil {
136+
server.grpc.EnableHealthService()
137+
}
138+
134139
return server, nil
135140
}
136141

pkg/cmd/tracee.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package cmd
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/aquasecurity/tracee/common/errfmt"
78
"github.com/aquasecurity/tracee/common/logger"
89
"github.com/aquasecurity/tracee/pkg/cmd/printer"
910
"github.com/aquasecurity/tracee/pkg/config"
1011
tracee "github.com/aquasecurity/tracee/pkg/ebpf"
12+
"github.com/aquasecurity/tracee/pkg/ebpf/heartbeat"
13+
"github.com/aquasecurity/tracee/pkg/server"
1114
"github.com/aquasecurity/tracee/pkg/server/grpc"
1215
"github.com/aquasecurity/tracee/pkg/server/http"
1316
"github.com/aquasecurity/tracee/pkg/streams"
@@ -31,6 +34,18 @@ func (r Runner) Run(ctx context.Context) error {
3134
t.AddReadyCallback(
3235
func(ctx context.Context) {
3336
logger.Debugw("Tracee is ready callback")
37+
38+
// Initialize heartbeat first (shared between HTTP and gRPC servers)
39+
// interval defines how often the heartbeat signal should be sent.
40+
heartbeatSignalInterval := time.Duration(1 * time.Second)
41+
// timeout specifies the maximum duration to wait for a heartbeat acknowledgment
42+
heartbeatAckTimeout := time.Duration(2 * time.Second)
43+
heartbeat.Init(ctx, heartbeatSignalInterval, heartbeatAckTimeout)
44+
// Set callback for InvokeHeartbeat (defined in server package)
45+
// The uprobe is attached to this function, so it must always be set
46+
heartbeat.GetInstance().SetCallback(server.InvokeHeartbeat)
47+
heartbeat.GetInstance().Start()
48+
3449
if r.HTTP != nil {
3550
if r.HTTP.IsMetricsEnabled() {
3651
if err := t.Stats().RegisterPrometheus(); err != nil {

pkg/ebpf/probes/probe_group.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func NewDefaultProbeGroup(module *bpf.Module, netEnabled bool, defaultAutoload b
312312
SignalSchedProcessFork: NewTraceProbe(RawTracepoint, "sched:sched_process_fork", "sched_process_fork_signal"),
313313
SignalSchedProcessExec: NewTraceProbe(RawTracepoint, "sched:sched_process_exec", "sched_process_exec_signal"),
314314
SignalSchedProcessExit: NewTraceProbe(RawTracepoint, "sched:sched_process_exit", "sched_process_exit_signal"),
315-
SignalHeartbeat: NewFixedUprobe(Uprobe, "heartbeat_capture", binaryPath, UprobeEventSymbol("github.com/aquasecurity/tracee/pkg/server/http.invokeHeartbeat")),
315+
SignalHeartbeat: NewFixedUprobe(Uprobe, "heartbeat_capture", binaryPath, UprobeEventSymbol("github.com/aquasecurity/tracee/pkg/server.InvokeHeartbeat")),
316316
ExecuteFinishedX86: NewTraceProbe(KretProbe, "__x64_sys_execve", "trace_execute_finished"),
317317
ExecuteAtFinishedX86: NewTraceProbe(KretProbe, "__x64_sys_execveat", "trace_execute_finished"),
318318
ExecuteFinishedCompatX86: NewTraceProbe(KretProbe, "__ia32_compat_sys_execve", "trace_execute_finished"),

pkg/server/grpc/health.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"google.golang.org/grpc/health"
8+
healthpb "google.golang.org/grpc/health/grpc_health_v1"
9+
10+
"github.com/aquasecurity/tracee/pkg/ebpf/heartbeat"
11+
)
12+
13+
// HealthService wraps the standard gRPC health server and integrates with Tracee's heartbeat mechanism
14+
type HealthService struct {
15+
server *health.Server
16+
}
17+
18+
// NewHealthService creates a new HealthService instance
19+
func NewHealthService() *HealthService {
20+
return &HealthService{
21+
server: health.NewServer(),
22+
}
23+
}
24+
25+
// Server returns the underlying health server for registration
26+
func (h *HealthService) Server() *health.Server {
27+
return h.server
28+
}
29+
30+
// StartMonitor polls heartbeat status and updates gRPC health accordingly.
31+
// It monitors the heartbeat at regular intervals (500ms) and updates the health status
32+
// for all registered services based on whether the heartbeat is alive.
33+
func (h *HealthService) StartMonitor(ctx context.Context) {
34+
// Use empty string for overall server health
35+
// This is sufficient for Kubernetes gRPC probes and most health checking scenarios
36+
// Individual service health can be added later if needed
37+
overallService := ""
38+
39+
// Initialize overall health as NOT_SERVING until heartbeat confirms health
40+
h.server.SetServingStatus(overallService, healthpb.HealthCheckResponse_NOT_SERVING)
41+
42+
ticker := time.NewTicker(500 * time.Millisecond)
43+
defer ticker.Stop()
44+
45+
for {
46+
select {
47+
case <-ctx.Done():
48+
// Set overall health to NOT_SERVING on shutdown
49+
h.server.SetServingStatus(overallService, healthpb.HealthCheckResponse_NOT_SERVING)
50+
return
51+
case <-ticker.C:
52+
// Poll heartbeat status
53+
instance := heartbeat.GetInstance()
54+
status := healthpb.HealthCheckResponse_NOT_SERVING
55+
if instance != nil && instance.IsAlive() {
56+
status = healthpb.HealthCheckResponse_SERVING
57+
}
58+
59+
// Update overall health status
60+
h.server.SetServingStatus(overallService, status)
61+
}
62+
}
63+
}

pkg/server/grpc/health_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"os"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/credentials/insecure"
13+
healthpb "google.golang.org/grpc/health/grpc_health_v1"
14+
15+
"github.com/aquasecurity/tracee/pkg/ebpf/heartbeat"
16+
"github.com/aquasecurity/tracee/pkg/server"
17+
)
18+
19+
func TestHealthService_Check(t *testing.T) {
20+
tempDir, err := os.MkdirTemp("", "tracee-health-tests")
21+
require.NoError(t, err)
22+
defer os.RemoveAll(tempDir)
23+
24+
unixSock := tempDir + "/tracee.sock"
25+
defer os.Remove(unixSock)
26+
27+
ctx, cancel := context.WithCancel(context.Background())
28+
// Don't cancel context until test is done to avoid closing heartbeat
29+
defer cancel()
30+
31+
// Initialize heartbeat for testing
32+
// Use a background context that won't be cancelled to keep heartbeat alive
33+
bgCtx := context.Background()
34+
heartbeat.Init(bgCtx, 1*time.Second, 2*time.Second)
35+
instance := heartbeat.GetInstance()
36+
require.NotNil(t, instance)
37+
instance.SetCallback(server.InvokeHeartbeat)
38+
instance.Start()
39+
40+
// In tests, manually send pulses since uprobe isn't attached
41+
pulseCtx, pulseCancel := context.WithCancel(ctx)
42+
defer pulseCancel()
43+
go func() {
44+
ticker := time.NewTicker(500 * time.Millisecond)
45+
defer ticker.Stop()
46+
for {
47+
select {
48+
case <-ticker.C:
49+
safeSendPulse()
50+
case <-pulseCtx.Done():
51+
return
52+
}
53+
}
54+
}()
55+
56+
grpcServer := New("unix", unixSock)
57+
grpcServer.EnableHealthService()
58+
go grpcServer.Start(ctx, nil, nil)
59+
60+
// Wait for server to start
61+
require.Eventually(t, func() bool {
62+
_, err := os.Stat(unixSock)
63+
return err == nil
64+
}, 2*time.Second, 10*time.Millisecond)
65+
66+
// Create health client
67+
conn, err := grpc.NewClient("unix:"+unixSock, grpc.WithTransportCredentials(insecure.NewCredentials()))
68+
require.NoError(t, err)
69+
defer conn.Close()
70+
71+
healthClient := healthpb.NewHealthClient(conn)
72+
73+
// Send initial pulse immediately
74+
safeSendPulse()
75+
76+
// Wait for health service monitor to poll and update status (polls every 500ms)
77+
require.Eventually(t, func() bool {
78+
resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{})
79+
return err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING
80+
}, 2*time.Second, 100*time.Millisecond, "health service should become SERVING")
81+
82+
// Test overall health (empty service name)
83+
t.Run("overall health check", func(t *testing.T) {
84+
resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{})
85+
require.NoError(t, err)
86+
assert.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status)
87+
})
88+
}
89+
90+
// safeSendPulse safely sends a pulse, recovering from panics if the channel is closed or instance is nil
91+
func safeSendPulse() {
92+
defer func() {
93+
recover();
94+
}()
95+
if instance := heartbeat.GetInstance(); instance != nil {
96+
heartbeat.SendPulse()
97+
}
98+
}

pkg/server/grpc/server.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"google.golang.org/grpc"
12+
healthpb "google.golang.org/grpc/health/grpc_health_v1"
1213
"google.golang.org/grpc/keepalive"
1314

1415
pb "github.com/aquasecurity/tracee/api/v1beta1"
@@ -18,18 +19,23 @@ import (
1819
)
1920

2021
type Server struct {
21-
listener net.Listener
22-
protocol string
23-
listenAddr string
24-
server *grpc.Server
22+
listener net.Listener
23+
protocol string
24+
listenAddr string
25+
server *grpc.Server
26+
healthService *HealthService
2527
}
2628

2729
func New(protocol, listenAddr string) *Server {
2830
if protocol == "tcp" {
2931
listenAddr = ":" + listenAddr
3032
}
3133

32-
return &Server{listener: nil, protocol: protocol, listenAddr: listenAddr}
34+
return &Server{
35+
listener: nil,
36+
protocol: protocol,
37+
listenAddr: listenAddr,
38+
}
3339
}
3440

3541
func (s *Server) Start(ctx context.Context, t *tracee.Tracee, e *engine.Engine) {
@@ -67,6 +73,12 @@ func (s *Server) Start(ctx context.Context, t *tracee.Tracee, e *engine.Engine)
6773
pb.RegisterDiagnosticServiceServer(grpcServer, &DiagnosticService{tracee: t})
6874
pb.RegisterDataSourceServiceServer(grpcServer, &DataSourceService{sigEngine: e})
6975

76+
// Register health service only if enabled
77+
if s.healthService != nil {
78+
healthpb.RegisterHealthServer(grpcServer, s.healthService.Server())
79+
go s.healthService.StartMonitor(ctx)
80+
}
81+
7082
go func() {
7183
logger.Debugw("Starting grpc server", "protocol", s.protocol, "address", s.listenAddr)
7284
if err := grpcServer.Serve(s.listener); err != nil {
@@ -97,6 +109,11 @@ func (s *Server) Address() string {
97109
return fmt.Sprintf("%s:%s", s.protocol, addr)
98110
}
99111

112+
// EnableHealthService enables the gRPC health checking service
113+
func (s *Server) EnableHealthService() {
114+
s.healthService = NewHealthService()
115+
}
116+
100117
func (s *Server) cleanup() {
101118
s.server.GracefulStop()
102119
}

pkg/server/heartbeat.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package server
2+
3+
// InvokeHeartbeat is a no-op function used as a callback for heartbeat.
4+
// It's instrumented by an uprobe to detect liveness.
5+
// This function is shared between HTTP and gRPC servers.
6+
//
7+
//go:noinline
8+
func InvokeHeartbeat() {
9+
// Intentionally left empty
10+
}

pkg/server/http/server.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,6 @@ func (s *Server) Start(ctx context.Context) {
8080
srvCancel()
8181
}()
8282

83-
heartbeatCtx, cancel := context.WithCancel(srvCtx)
84-
defer cancel()
85-
86-
heartbeat.Init(heartbeatCtx, heartbeatSignalInterval, heartbeatAckTimeout)
87-
heartbeat.GetInstance().SetCallback(invokeHeartbeat)
88-
heartbeat.GetInstance().Start()
89-
9083
select {
9184
case <-ctx.Done():
9285
logger.Debugw("Context cancelled, shutting down metrics endpoint server")
@@ -160,8 +153,3 @@ func (s *Server) IsPyroscopeEnabled() bool {
160153
func (s *Server) Address() string {
161154
return s.address
162155
}
163-
164-
//go:noinline
165-
func invokeHeartbeat() {
166-
// Intentionally left empty
167-
}

0 commit comments

Comments
 (0)