Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion deploy/helm/tracee/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,48 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
{{- if or .Values.config.server.httpAddress .Values.config.server.metrics .Values.config.server.healthz }}
{{- $hasHttpServer := or .Values.config.server.httpAddress .Values.config.server.metrics .Values.config.server.healthz }}
{{- $hasGrpcServer := .Values.config.server.grpcAddress }}
{{- $grpcAddress := .Values.config.server.grpcAddress }}
{{- $isGrpcTcp := false }}
{{- $grpcPort := "" }}
{{- if $hasGrpcServer }}
{{- $parts := splitList ":" $grpcAddress }}
{{- if eq (index $parts 0) "tcp" }}
{{- $isGrpcTcp = true }}
{{- if gt (len $parts) 2 }}
{{- $grpcPort = index $parts 2 }}
{{- else if eq (len $parts) 2 }}
{{- $grpcPort = index $parts 1 }}
{{- end }}
{{- end }}
{{- end }}
{{- if or $hasHttpServer $hasGrpcServer }}
ports:
{{- if $hasHttpServer }}
- name: metrics
containerPort: {{ trimPrefix ":" (.Values.config.server.httpAddress | default ":3366") }}
protocol: TCP
{{- end }}
{{- if $isGrpcTcp }}
- name: grpc
containerPort: {{ $grpcPort }}
protocol: TCP
{{- end }}
{{- end }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
{{- if .Values.config.server.healthz }}
readinessProbe:
{{- if and $isGrpcTcp $grpcPort }}
grpc:
port: {{ $grpcPort }}
service: ""
{{- else if $hasHttpServer }}
httpGet:
path: /healthz
port: {{ trimPrefix ":" (.Values.config.server.httpAddress | default ":3366") }}
{{- end }}
{{- end }}
volumeMounts:
- name: tmp-tracee
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/flags/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func PrepareServer(serverSlice []string) (ServerConfig, error) {
return server, err
}

// Enable gRPC health service if healthz flag is set
if server.Healthz && server.grpc != nil {
server.grpc.EnableHealthService()
}

return server, nil
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/cmd/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import (
"context"
"time"

"github.com/aquasecurity/tracee/common/errfmt"
"github.com/aquasecurity/tracee/common/logger"
"github.com/aquasecurity/tracee/pkg/cmd/printer"
"github.com/aquasecurity/tracee/pkg/config"
tracee "github.com/aquasecurity/tracee/pkg/ebpf"
"github.com/aquasecurity/tracee/pkg/ebpf/heartbeat"
"github.com/aquasecurity/tracee/pkg/server"
"github.com/aquasecurity/tracee/pkg/server/grpc"
"github.com/aquasecurity/tracee/pkg/server/http"
"github.com/aquasecurity/tracee/pkg/streams"
Expand All @@ -33,6 +35,17 @@
func(ctx context.Context) {
logger.Debugw("Tracee is ready callback")

// Initialize heartbeat first (shared between HTTP and gRPC servers)
// interval defines how often the heartbeat signal should be sent.
heartbeatSignalInterval := time.Duration(1 * time.Second)
// timeout specifies the maximum duration to wait for a heartbeat acknowledgment
heartbeatAckTimeout := time.Duration(2 * time.Second)
heartbeat.Init(ctx, heartbeatSignalInterval, heartbeatAckTimeout)
// Set callback for InvokeHeartbeat (defined in server package)
// The uprobe is attached to this function, so it must always be set
heartbeat.GetInstance().SetCallback(server.InvokeHeartbeat)
heartbeat.GetInstance().Start()

Check warning on line 48 in pkg/cmd/tracee.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/tracee.go#L38-L48

Added lines #L38 - L48 were not covered by tests
// Start HTTP server if configured
if r.HTTP != nil {
// Enable healthz endpoint with combined readiness check
Expand Down
2 changes: 1 addition & 1 deletion pkg/ebpf/probes/probe_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@
SignalSchedProcessFork: NewTraceProbe(RawTracepoint, "sched:sched_process_fork", "sched_process_fork_signal"),
SignalSchedProcessExec: NewTraceProbe(RawTracepoint, "sched:sched_process_exec", "sched_process_exec_signal"),
SignalSchedProcessExit: NewTraceProbe(RawTracepoint, "sched:sched_process_exit", "sched_process_exit_signal"),
SignalHeartbeat: NewFixedUprobe(Uprobe, "heartbeat_capture", binaryPath, UprobeEventSymbol("github.com/aquasecurity/tracee/pkg/server/http.invokeHeartbeat")),
SignalHeartbeat: NewFixedUprobe(Uprobe, "heartbeat_capture", binaryPath, UprobeEventSymbol("github.com/aquasecurity/tracee/pkg/server.InvokeHeartbeat")),

Check warning on line 315 in pkg/ebpf/probes/probe_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/ebpf/probes/probe_group.go#L315

Added line #L315 was not covered by tests
ExecuteFinishedX86: NewTraceProbe(KretProbe, "__x64_sys_execve", "trace_execute_finished"),
ExecuteAtFinishedX86: NewTraceProbe(KretProbe, "__x64_sys_execveat", "trace_execute_finished"),
ExecuteFinishedCompatX86: NewTraceProbe(KretProbe, "__ia32_compat_sys_execve", "trace_execute_finished"),
Expand Down
65 changes: 65 additions & 0 deletions pkg/server/grpc/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package grpc

import (
"context"
"time"

"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"github.com/aquasecurity/tracee/pkg/ebpf/heartbeat"
)

// HealthService wraps the standard gRPC health server and integrates with Tracee's heartbeat mechanism
type HealthService struct {
server *health.Server
}

// NewHealthService creates a new HealthService instance
func NewHealthService() *HealthService {
return &HealthService{
server: health.NewServer(),
}
}

// Server returns the underlying health server for registration
func (h *HealthService) Server() *health.Server {
return h.server
}

// StartMonitor polls heartbeat status and updates gRPC health accordingly.
// It monitors the heartbeat at regular intervals and updates the health status
// for all registered services based on whether the heartbeat is alive.
func (h *HealthService) StartMonitor(ctx context.Context) {
// Use empty string for overall server health
// This is sufficient for Kubernetes gRPC probes and most health checking scenarios
// Individual service health can be added later if needed
overallService := ""

// Initialize overall health as NOT_SERVING until heartbeat confirms health
h.server.SetServingStatus(overallService, healthpb.HealthCheckResponse_NOT_SERVING)

// Poll at the same interval as the heartbeat ack timeout (2s), since that's
// the boundary at which IsAlive() state actually changes.
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
// Set overall health to NOT_SERVING on shutdown
h.server.SetServingStatus(overallService, healthpb.HealthCheckResponse_NOT_SERVING)
return
case <-ticker.C:
// Poll heartbeat status
instance := heartbeat.GetInstance()
status := healthpb.HealthCheckResponse_NOT_SERVING
if instance != nil && instance.IsAlive() {
status = healthpb.HealthCheckResponse_SERVING
}

// Update overall health status
h.server.SetServingStatus(overallService, status)
}
}
}
98 changes: 98 additions & 0 deletions pkg/server/grpc/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package grpc

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"github.com/aquasecurity/tracee/pkg/ebpf/heartbeat"
"github.com/aquasecurity/tracee/pkg/server"
)

func TestHealthService_Check(t *testing.T) {
tempDir, err := os.MkdirTemp("", "tracee-health-tests")
require.NoError(t, err)
defer os.RemoveAll(tempDir)

unixSock := tempDir + "/tracee.sock"
defer os.Remove(unixSock)

ctx, cancel := context.WithCancel(context.Background())
// Don't cancel context until test is done to avoid closing heartbeat
defer cancel()

// Initialize heartbeat for testing
// Use a background context that won't be cancelled to keep heartbeat alive
bgCtx := context.Background()
heartbeat.Init(bgCtx, 1*time.Second, 2*time.Second)
instance := heartbeat.GetInstance()
require.NotNil(t, instance)
instance.SetCallback(server.InvokeHeartbeat)
instance.Start()

// In tests, manually send pulses since uprobe isn't attached
pulseCtx, pulseCancel := context.WithCancel(ctx)
defer pulseCancel()
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
safeSendPulse()
case <-pulseCtx.Done():
return
}
}
}()

grpcServer := New("unix", unixSock)
grpcServer.EnableHealthService()
go grpcServer.Start(ctx, nil, nil)

// Wait for server to start
require.Eventually(t, func() bool {
_, err := os.Stat(unixSock)
return err == nil
}, 2*time.Second, 10*time.Millisecond)

// Create health client
conn, err := grpc.NewClient("unix:"+unixSock, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()

healthClient := healthpb.NewHealthClient(conn)

// Send initial pulse immediately
safeSendPulse()

// Wait for health service monitor to poll and update status (polls every 2s)
require.Eventually(t, func() bool {
resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{})
return err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING
}, 5*time.Second, 100*time.Millisecond, "health service should become SERVING")

// Test overall health (empty service name)
t.Run("overall health check", func(t *testing.T) {
resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{})
require.NoError(t, err)
assert.Equal(t, healthpb.HealthCheckResponse_SERVING, resp.Status)
})
}

// safeSendPulse safely sends a pulse, recovering from panics if the channel is closed or instance is nil
func safeSendPulse() {
defer func() {
recover()
}()
if instance := heartbeat.GetInstance(); instance != nil {
heartbeat.SendPulse()
}
}
27 changes: 22 additions & 5 deletions pkg/server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"

pb "github.com/aquasecurity/tracee/api/v1beta1"
Expand All @@ -18,18 +19,23 @@ import (
)

type Server struct {
listener net.Listener
protocol string
listenAddr string
server *grpc.Server
listener net.Listener
protocol string
listenAddr string
server *grpc.Server
healthService *HealthService
}

func New(protocol, listenAddr string) *Server {
if protocol == "tcp" {
listenAddr = ":" + listenAddr
}

return &Server{listener: nil, protocol: protocol, listenAddr: listenAddr}
return &Server{
listener: nil,
protocol: protocol,
listenAddr: listenAddr,
}
}

func (s *Server) Start(ctx context.Context, t *tracee.Tracee, e *engine.Engine) {
Expand Down Expand Up @@ -67,6 +73,12 @@ func (s *Server) Start(ctx context.Context, t *tracee.Tracee, e *engine.Engine)
pb.RegisterDiagnosticServiceServer(grpcServer, &DiagnosticService{tracee: t})
pb.RegisterDataSourceServiceServer(grpcServer, &DataSourceService{sigEngine: e})

// Register health service only if enabled
if s.healthService != nil {
healthpb.RegisterHealthServer(grpcServer, s.healthService.Server())
go s.healthService.StartMonitor(ctx)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a doubt, this go routine spawning could race with the line 84 somehow (inside other spawning)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a doubt, this go routine spawning could race with the line 84 somehow (inside other spawning)?

No race here. The RegisterHealthServer call on line 78 completes synchronously before either goroutine is spawned. After that, the two goroutines operate on independent concerns:

  • StartMonitor only calls health.Server.SetServingStatus(), which is internally synchronized with a mutex in the standard gRPC health server implementation.
  • grpcServer.Serve() starts accepting connections and dispatching RPCs, reading the health status through the same mutex-protected Check/Watch handlers.

So even if Serve starts accepting connections before StartMonitor sets the initial NOT_SERVING status, a health check arriving in that window would get SERVICE_UNKNOWN (the default for unregistered services), which Kubernetes treats as unhealthy — same practical effect as NOT_SERVING.

}

go func() {
logger.Debugw("Starting grpc server", "protocol", s.protocol, "address", s.listenAddr)
if err := grpcServer.Serve(s.listener); err != nil {
Expand Down Expand Up @@ -97,6 +109,11 @@ func (s *Server) Address() string {
return fmt.Sprintf("%s:%s", s.protocol, addr)
}

// EnableHealthService enables the gRPC health checking service
func (s *Server) EnableHealthService() {
s.healthService = NewHealthService()
}

func (s *Server) cleanup() {
s.server.GracefulStop()
}
10 changes: 10 additions & 0 deletions pkg/server/heartbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package server

// InvokeHeartbeat is a no-op function used as a callback for heartbeat.
// It's instrumented by an uprobe to detect liveness.
// This function is shared between HTTP and gRPC servers.
//
//go:noinline
func InvokeHeartbeat() {
// Intentionally left empty

Check warning on line 9 in pkg/server/heartbeat.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/heartbeat.go#L8-L9

Added lines #L8 - L9 were not covered by tests
}
20 changes: 0 additions & 20 deletions pkg/server/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/aquasecurity/tracee/common/logger"
"github.com/aquasecurity/tracee/pkg/ebpf/heartbeat"
)

// interval defines how often the heartbeat signal should be sent.
const heartbeatSignalInterval = time.Duration(1 * time.Second)

// timeout specifies the maximum duration to wait for a heartbeat acknowledgment
const heartbeatAckTimeout = time.Duration(2 * time.Second)

// Server represents a http server
type Server struct {
hs *http.Server
Expand Down Expand Up @@ -90,14 +83,6 @@ func (s *Server) Start(ctx context.Context) {
srvCancel()
}()

// Initialize heartbeat monitoring
heartbeatCtx, cancelHeartbeat := context.WithCancel(srvCtx)
defer cancelHeartbeat()

heartbeat.Init(heartbeatCtx, heartbeatSignalInterval, heartbeatAckTimeout)
heartbeat.GetInstance().SetCallback(invokeHeartbeat)
heartbeat.GetInstance().Start()

select {
case <-ctx.Done():
logger.Debugw("Context cancelled, shutting down HTTP server")
Expand Down Expand Up @@ -174,8 +159,3 @@ func (s *Server) IsPyroscopeEnabled() bool {
func (s *Server) Address() string {
return s.address
}

//go:noinline
func invokeHeartbeat() {
// Intentionally left empty
}
Loading