From 06dcc860c3e392702415a80cc3aa1f1738bf63ce Mon Sep 17 00:00:00 2001 From: CYJiang Date: Thu, 27 Nov 2025 19:26:03 +0800 Subject: [PATCH 1/3] feat: add leader election support for HA in gateway plugin Signed-off-by: CYJiang --- cmd/plugins/main.go | 181 ++++++++++++++++-- .../gateway-plugin/gateway-plugin.yaml | 6 + .../templates/gateway-plugin/deployment.yaml | 4 + dist/chart/templates/gateway-plugin/rbac.yaml | 12 ++ dist/chart/values.yaml | 18 +- pkg/plugins/gateway/health/health.go | 158 +++++++++++++++ 6 files changed, 351 insertions(+), 28 deletions(-) create mode 100644 pkg/plugins/gateway/health/health.go diff --git a/cmd/plugins/main.go b/cmd/plugins/main.go index d67cb3048..eaa16ff41 100644 --- a/cmd/plugins/main.go +++ b/cmd/plugins/main.go @@ -17,38 +17,56 @@ limitations under the License. package main import ( + "context" "flag" "net" "net/http" "os" "os/signal" + "sync/atomic" "syscall" + "time" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" + "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" - extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/vllm-project/aibrix/pkg/cache" "github.com/vllm-project/aibrix/pkg/constants" "github.com/vllm-project/aibrix/pkg/plugins/gateway" routing "github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms" + healthserver "github.com/vllm-project/aibrix/pkg/plugins/gateway/health" "github.com/vllm-project/aibrix/pkg/utils" - "google.golang.org/grpc/health" - healthPb "google.golang.org/grpc/health/grpc_health_v1" - "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" ) var ( - grpcAddr string - metricsAddr string + grpcAddr string + metricsAddr string + profilingAddr string + enableLeaderElection bool + leaderElectionID string + leaderElectionNamespace string ) func main() { flag.StringVar(&grpcAddr, "grpc-bind-address", ":50052", "The address the gRPC server binds to.") flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") + flag.StringVar(&profilingAddr, "profiling-bind-address", ":6061", "The address the profiling endpoint binds to.") + flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for high availability") + flag.StringVar(&leaderElectionID, + "leader-election-id", "gateway-plugin-lock", "Name of the lease resource for leader election") + flag.StringVar(&leaderElectionNamespace, "leader-election-namespace", + "aibrix-system", "Namespace for leader election lease (default: same as pod)") + klog.InitFlags(flag.CommandLine) defer klog.Flush() flag.Parse() @@ -110,32 +128,155 @@ func main() { } klog.Infof("Started metrics server on %s", metricsAddr) + isLeader := &atomic.Bool{} + isLeader.Store(false) + + leaderCtx, leaderCancel := context.WithCancel(context.Background()) + defer leaderCancel() + + if enableLeaderElection { + klog.Info("Leader election enabled") + + // Get pod info for lease + podName := os.Getenv("POD_NAME") + if podName == "" { + podName = string(uuid.NewUUID()) + } + if leaderElectionNamespace == "" { + podNamespace := os.Getenv("POD_NAMESPACE") + if podNamespace == "" { + // Read from file (in-cluster mode) + nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + klog.Fatalf("Failed to read namespace from file: %v", err) + } + podNamespace = string(nsBytes) + } + leaderElectionNamespace = podNamespace + } + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: leaderElectionID, + Namespace: leaderElectionNamespace, + }, + Client: k8sClient.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: podName, + }, + } + + leConfig := leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.Info("This instance is now the leader!") + isLeader.Store(true) + }, + OnStoppedLeading: func() { + klog.Info("This instance is no longer the leader, initiating graceful shutdown...") + // Cancel the leader context to stop leader-specific operations + leaderCancel() + // Exit the process to let Kubernetes restart it + os.Exit(0) + }, + OnNewLeader: func(identity string) { + if identity == podName { + klog.Info("Still the leader") + } else { + klog.Infof("New leader elected: %s", identity) + } + }, + }, + ReleaseOnCancel: true, + } + + leaderElector, err := leaderelection.NewLeaderElector(leConfig) + if err != nil { + klog.Fatalf("Failed to create leader elector: %v", err) + } + + go func() { + leaderElector.Run(leaderCtx) + }() + } else { + // Single instance mode, all instances are leaders + isLeader.Store(true) + klog.Info("Single instance mode enabled, this instance is always the leader") + } + + // Setup gRPC server with custom health server s := grpc.NewServer() extProcPb.RegisterExternalProcessorServer(s, gatewayServer) - healthCheck := health.NewServer() - healthPb.RegisterHealthServer(s, healthCheck) - healthCheck.SetServingStatus("gateway-plugin", healthPb.HealthCheckResponse_SERVING) + newHealthServer := healthserver.NewHealthServer(isLeader, enableLeaderElection) + healthpb.RegisterHealthServer(s, newHealthServer) klog.Info("starting gRPC server on " + grpcAddr) + profilingServer := &http.Server{ + Addr: profilingAddr, + } go func() { - if err := http.ListenAndServe("localhost:6060", nil); err != nil { - klog.Fatalf("failed to setup profiling: %v", err) + if err := profilingServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + klog.Fatalf("failed to setup profiling on %s: %v", profilingAddr, err) } }() - var gracefulStop = make(chan os.Signal, 1) - signal.Notify(gracefulStop, syscall.SIGINT, syscall.SIGTERM) - go func() { - sig := <-gracefulStop - klog.Warningf("signal received: %v, initiating graceful shutdown...", sig) - gatewayServer.Shutdown() + // Create graceful shutdown function + gracefulShutdown := func() { + klog.Info("Initiating graceful shutdown...") + s.GracefulStop() - os.Exit(0) + klog.Info("gRPC server stopped") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := profilingServer.Shutdown(ctx); err != nil { + klog.Errorf("Error shutting down profiling server: %v", err) + } + klog.Info("Profiling server stopped") + + gatewayServer.Shutdown() + klog.Info("Gateway server stopped") + + if err := redisClient.Close(); err != nil { + klog.Warningf("Error closing Redis client during shutdown: %v", err) + } + klog.Info("Redis client closed") + + leaderCancel() + klog.Info("Leader context cancelled") + klog.Info("Graceful shutdown completed") + } + + go func() { + if err := s.Serve(lis); err != nil { + klog.Errorf("gRPC server error: %v", err) + } }() - if err := s.Serve(lis); err != nil { - panic(err) + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM) + + if enableLeaderElection { + // In leader election mode: wait for either signal or losing leadership + select { + case sig := <-signalCh: + klog.Warningf("signal received: %v, initiating graceful shutdown...", sig) + case <-leaderCtx.Done(): + klog.Info("Leader context cancelled (lost leadership), initiating shutdown...") + } + gracefulShutdown() + os.Exit(0) + } else { + // In single instance mode: wait for shutdown signal + sig := <-signalCh + klog.Warningf("signal received: %v, initiating graceful shutdown...", sig) + gracefulShutdown() + os.Exit(0) } } diff --git a/config/gateway/gateway-plugin/gateway-plugin.yaml b/config/gateway/gateway-plugin/gateway-plugin.yaml index 518de8c23..2e1222e76 100644 --- a/config/gateway/gateway-plugin/gateway-plugin.yaml +++ b/config/gateway/gateway-plugin/gateway-plugin.yaml @@ -73,6 +73,10 @@ spec: command: ['sh', '-c', 'until echo "ping" | nc aibrix-redis-master 6379 -w 1 | grep -c PONG; do echo waiting for service aibrix-redis-master; sleep 2; done'] containers: - name: gateway-plugin + args: + - --enable-leader-election=false + #- --leader-election-id=gateway-plugin-lock + #- --leader-election-namespace=aibrix-system image: gateway-plugins:latest imagePullPolicy: IfNotPresent ports: @@ -120,11 +124,13 @@ spec: livenessProbe: grpc: port: 50052 + service: liveness initialDelaySeconds: 5 periodSeconds: 10 readinessProbe: grpc: port: 50052 + service: readiness initialDelaySeconds: 5 periodSeconds: 10 serviceAccountName: aibrix-gateway-plugins diff --git a/dist/chart/templates/gateway-plugin/deployment.yaml b/dist/chart/templates/gateway-plugin/deployment.yaml index 30883d747..57bd1fc72 100644 --- a/dist/chart/templates/gateway-plugin/deployment.yaml +++ b/dist/chart/templates/gateway-plugin/deployment.yaml @@ -73,6 +73,10 @@ spec: - name: gateway-plugin image: {{ .Values.gatewayPlugin.container.image.repository }}:{{ .Values.gatewayPlugin.container.image.tag }} imagePullPolicy: {{ .Values.gatewayPlugin.container.image.imagePullPolicy | default "IfNotPresent" }} + #args: + # - --enable-leader-election=false + # - --leader-election-id=gateway-plugin-lock + # - --leader-election-namespace=aibrix-system ports: - name: gateway containerPort: 50052 diff --git a/dist/chart/templates/gateway-plugin/rbac.yaml b/dist/chart/templates/gateway-plugin/rbac.yaml index 2a9a732b9..e9d374329 100644 --- a/dist/chart/templates/gateway-plugin/rbac.yaml +++ b/dist/chart/templates/gateway-plugin/rbac.yaml @@ -28,6 +28,18 @@ rules: - patch - update - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - model.aibrix.ai resources: diff --git a/dist/chart/values.yaml b/dist/chart/values.yaml index 2283802a4..bb83c3ceb 100644 --- a/dist/chart/values.yaml +++ b/dist/chart/values.yaml @@ -64,17 +64,19 @@ gatewayPlugin: liveness: grpc: port: 50052 - initialDelaySeconds: 5 - periodSeconds: 10 - timeoutSeconds: 3 - failureThreshold: 3 + service: liveness + initialDelaySeconds: 60 + periodSeconds: 15 + timeoutSeconds: 10 + failureThreshold: 8 readiness: grpc: port: 50052 - initialDelaySeconds: 5 - periodSeconds: 10 - timeoutSeconds: 3 - failureThreshold: 3 + service: readiness + initialDelaySeconds: 60 + periodSeconds: 15 + timeoutSeconds: 10 + failureThreshold: 8 envs: AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS: "50" AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE: "character" diff --git a/pkg/plugins/gateway/health/health.go b/pkg/plugins/gateway/health/health.go new file mode 100644 index 000000000..6488f09e4 --- /dev/null +++ b/pkg/plugins/gateway/health/health.go @@ -0,0 +1,158 @@ +/* +Copyright 2025 The Aibrix Team. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package health + +import ( + "context" + "sync/atomic" + "time" + + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + "k8s.io/klog/v2" +) + +const ( + LivenessCheckService = "liveness" + ReadinessCheckService = "readiness" +) + +// SimpleHealthServer implements health check with Leader Election support +type SimpleHealthServer struct { + // Leader election related + isLeader *atomic.Bool + leaderElectionEnabled bool +} + +func NewHealthServer(isLeader *atomic.Bool, leaderElectionEnabled bool) *SimpleHealthServer { + return &SimpleHealthServer{ + isLeader: isLeader, + leaderElectionEnabled: leaderElectionEnabled, + } +} + +func (s *SimpleHealthServer) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { + klog.V(4).Infof("Health check request for service: %s, leader election enabled: %t, current leader: %t", + in.Service, s.leaderElectionEnabled, s.isLeader.Load()) + + // If leader election is not enabled, return SERVING for all services (compatibility) + if !s.leaderElectionEnabled { + klog.V(6).Infof("Leader election disabled, returning SERVING for service: %s", in.Service) + return &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_SERVING, + }, nil + } + + // when leader election is enabled + switch in.Service { + case LivenessCheckService: + // Liveness: any running instance returns SERVING + // This prevents non-leader pods from being restarted due to readiness failure + klog.V(6).Infof("Liveness check for service: %s, returning SERVING", in.Service) + return &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_SERVING, + }, nil + case ReadinessCheckService, "": + // Readiness and empty service name: only leader returns SERVING + // This ensures only leader is added to Kubernetes Service Endpoints + if s.isLeader.Load() { + klog.V(6).Infof("Readiness check for service: %s, current instance is leader, returning SERVING", in.Service) + return &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_SERVING, + }, nil + } + klog.V(6).Infof("Readiness check for service: %s, current instance is not leader, returning NOT_SERVING", in.Service) + return &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_NOT_SERVING, + }, nil + default: + // Other services (e.g., gateway): only leader returns SERVING + if s.isLeader.Load() { + klog.V(6).Infof("Health check for service: %s, current instance is leader, returning SERVING", in.Service) + return &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_SERVING, + }, nil + } + klog.V(6).Infof("Health check for service: %s, current instance is not leader, returning NOT_SERVING", in.Service) + return &healthpb.HealthCheckResponse{ + Status: healthpb.HealthCheckResponse_NOT_SERVING, + }, nil + } +} + +func (s *SimpleHealthServer) Watch(in *healthpb.HealthCheckRequest, stream healthpb.Health_WatchServer) error { + klog.V(6).Infof("Health watch request for service: %s", in.Service) + + // Simple implementation: send current status periodically + update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) + + // Send initial status + initialResp, err := s.Check(context.Background(), in) + if err != nil { + klog.Errorf("Failed to get initial status for service %s: %v", in.Service, err) + return err + } + update <- initialResp.Status + klog.V(6).Infof("Sent initial status %s for service: %s", initialResp.Status.String(), in.Service) + + // Update status periodically + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + resp, err := s.Check(context.Background(), in) + if err != nil { + klog.Errorf("Failed to get status for service %s: %v", in.Service, err) + return + } + select { + case update <- resp.Status: + klog.V(6).Infof("Updated status %s for service: %s", resp.Status.String(), in.Service) + default: + // Channel full, skip + klog.V(6).Infof("Status channel full, skipping update for service: %s", in.Service) + } + case <-stream.Context().Done(): + klog.V(6).Infof("Stream context done for service: %s", in.Service) + return + } + } + }() + + var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1 + for { + select { + case servingStatus := <-update: + if lastSentStatus != servingStatus { + err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) + if err != nil { + klog.Errorf("Failed to send health status for service %s: %v", in.Service, err) + return status.Error(codes.Canceled, "Stream has ended.") + } + klog.V(6).Infof("Sent status %s for service: %s", servingStatus.String(), in.Service) + lastSentStatus = servingStatus + } + case <-stream.Context().Done(): + klog.V(6).Infof("Stream context done for service: %s", in.Service) + return status.Error(codes.Canceled, "Stream has ended.") + } + } +} From 7c2db8f30da292889d62e15b34e6fdbd58734e6e Mon Sep 17 00:00:00 2001 From: CYJiang Date: Fri, 5 Dec 2025 13:34:04 +0800 Subject: [PATCH 2/3] add unit test Signed-off-by: CYJiang --- pkg/plugins/gateway/health/health_test.go | 65 +++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 pkg/plugins/gateway/health/health_test.go diff --git a/pkg/plugins/gateway/health/health_test.go b/pkg/plugins/gateway/health/health_test.go new file mode 100644 index 000000000..10f9fe24d --- /dev/null +++ b/pkg/plugins/gateway/health/health_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2025 The Aibrix Team. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package health + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestSimpleHealthServer_Check(t *testing.T) { + tests := []struct { + name string + leaderElectionEnabled bool + isLeader bool + service string + expectStatus grpc_health_v1.HealthCheckResponse_ServingStatus + }{ + // Case 1: Leader election disabled → always SERVING + {"leader-election-disabled", false, false, "gateway", grpc_health_v1.HealthCheckResponse_SERVING}, + {"leader-election-disabled-liveness", false, true, "liveness", grpc_health_v1.HealthCheckResponse_SERVING}, + + // Case 2: Leader election enabled + is leader + {"leader-enabled-is-leader-readiness", true, true, "readiness", grpc_health_v1.HealthCheckResponse_SERVING}, + {"leader-enabled-is-leader-liveness", true, true, "liveness", grpc_health_v1.HealthCheckResponse_SERVING}, + {"leader-enabled-is-leader-empty", true, true, "", grpc_health_v1.HealthCheckResponse_SERVING}, + {"leader-enabled-is-leader-gateway", true, true, "gateway", grpc_health_v1.HealthCheckResponse_SERVING}, + + // Case 3: Leader election enabled + not leader + {"leader-enabled-not-leader-readiness", true, false, "readiness", grpc_health_v1.HealthCheckResponse_NOT_SERVING}, + {"leader-enabled-not-leader-empty", true, false, "", grpc_health_v1.HealthCheckResponse_NOT_SERVING}, + {"leader-enabled-not-leader-gateway", true, false, "gateway", grpc_health_v1.HealthCheckResponse_NOT_SERVING}, + {"leader-enabled-not-leader-liveness", true, false, "liveness", grpc_health_v1.HealthCheckResponse_SERVING}, // liveness always serves + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var isLeader atomic.Bool + isLeader.Store(tt.isLeader) + + server := NewHealthServer(&isLeader, tt.leaderElectionEnabled) + resp, err := server.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{Service: tt.service}) + require.NoError(t, err) + assert.Equal(t, tt.expectStatus, resp.Status) + }) + } +} From 029bac934a873c132febd643ea6f0301e4f23639 Mon Sep 17 00:00:00 2001 From: CYJiang Date: Fri, 5 Dec 2025 13:40:01 +0800 Subject: [PATCH 3/3] add miss gateway plugin rbac config Signed-off-by: CYJiang --- config/rbac/gateway/role_gateway.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/config/rbac/gateway/role_gateway.yaml b/config/rbac/gateway/role_gateway.yaml index 08e76e5b3..74fed21c3 100644 --- a/config/rbac/gateway/role_gateway.yaml +++ b/config/rbac/gateway/role_gateway.yaml @@ -28,6 +28,18 @@ rules: - patch - update - watch +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - gateway.networking.k8s.io resources: