Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 161 additions & 20 deletions cmd/plugins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,56 @@ limitations under the License.
package main

import (
"context"
"flag"
"net"
"net/http"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/vllm-project/aibrix/pkg/cache"
"github.com/vllm-project/aibrix/pkg/constants"
"github.com/vllm-project/aibrix/pkg/plugins/gateway"
routing "github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms"
healthserver "github.com/vllm-project/aibrix/pkg/plugins/gateway/health"
"github.com/vllm-project/aibrix/pkg/utils"
"google.golang.org/grpc/health"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
)

var (
grpcAddr string
metricsAddr string
grpcAddr string
metricsAddr string
profilingAddr string
enableLeaderElection bool
leaderElectionID string
leaderElectionNamespace string
)

func main() {
flag.StringVar(&grpcAddr, "grpc-bind-address", ":50052", "The address the gRPC server binds to.")
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&profilingAddr, "profiling-bind-address", ":6061", "The address the profiling endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for high availability")
flag.StringVar(&leaderElectionID,
"leader-election-id", "gateway-plugin-lock", "Name of the lease resource for leader election")
flag.StringVar(&leaderElectionNamespace, "leader-election-namespace",
"aibrix-system", "Namespace for leader election lease (default: same as pod)")

klog.InitFlags(flag.CommandLine)
defer klog.Flush()
flag.Parse()
Expand Down Expand Up @@ -110,32 +128,155 @@ func main() {
}
klog.Infof("Started metrics server on %s", metricsAddr)

isLeader := &atomic.Bool{}
isLeader.Store(false)

leaderCtx, leaderCancel := context.WithCancel(context.Background())
defer leaderCancel()

if enableLeaderElection {
klog.Info("Leader election enabled")

// Get pod info for lease
podName := os.Getenv("POD_NAME")
if podName == "" {
podName = string(uuid.NewUUID())
}
if leaderElectionNamespace == "" {
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
// Read from file (in-cluster mode)
nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
klog.Fatalf("Failed to read namespace from file: %v", err)
}
podNamespace = string(nsBytes)
}
leaderElectionNamespace = podNamespace
}

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaderElectionID,
Namespace: leaderElectionNamespace,
},
Client: k8sClient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: podName,
},
}

leConfig := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("This instance is now the leader!")
isLeader.Store(true)
},
OnStoppedLeading: func() {
klog.Info("This instance is no longer the leader, initiating graceful shutdown...")
// Cancel the leader context to stop leader-specific operations
leaderCancel()
// Exit the process to let Kubernetes restart it
os.Exit(0)
},
OnNewLeader: func(identity string) {
if identity == podName {
klog.Info("Still the leader")
} else {
klog.Infof("New leader elected: %s", identity)
}
},
},
ReleaseOnCancel: true,
}

leaderElector, err := leaderelection.NewLeaderElector(leConfig)
if err != nil {
klog.Fatalf("Failed to create leader elector: %v", err)
}

go func() {
leaderElector.Run(leaderCtx)
}()
} else {
// Single instance mode, all instances are leaders
isLeader.Store(true)
klog.Info("Single instance mode enabled, this instance is always the leader")
}

// Setup gRPC server with custom health server
s := grpc.NewServer()
extProcPb.RegisterExternalProcessorServer(s, gatewayServer)

healthCheck := health.NewServer()
healthPb.RegisterHealthServer(s, healthCheck)
healthCheck.SetServingStatus("gateway-plugin", healthPb.HealthCheckResponse_SERVING)
newHealthServer := healthserver.NewHealthServer(isLeader, enableLeaderElection)
healthpb.RegisterHealthServer(s, newHealthServer)

klog.Info("starting gRPC server on " + grpcAddr)

profilingServer := &http.Server{
Addr: profilingAddr,
}
go func() {
if err := http.ListenAndServe("localhost:6060", nil); err != nil {
klog.Fatalf("failed to setup profiling: %v", err)
if err := profilingServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
klog.Fatalf("failed to setup profiling on %s: %v", profilingAddr, err)
}
}()

var gracefulStop = make(chan os.Signal, 1)
signal.Notify(gracefulStop, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-gracefulStop
klog.Warningf("signal received: %v, initiating graceful shutdown...", sig)
gatewayServer.Shutdown()
// Create graceful shutdown function
gracefulShutdown := func() {
klog.Info("Initiating graceful shutdown...")

s.GracefulStop()
os.Exit(0)
klog.Info("gRPC server stopped")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := profilingServer.Shutdown(ctx); err != nil {
klog.Errorf("Error shutting down profiling server: %v", err)
}
klog.Info("Profiling server stopped")

gatewayServer.Shutdown()
klog.Info("Gateway server stopped")

if err := redisClient.Close(); err != nil {
klog.Warningf("Error closing Redis client during shutdown: %v", err)
}
klog.Info("Redis client closed")

leaderCancel()
klog.Info("Leader context cancelled")
klog.Info("Graceful shutdown completed")
}

go func() {
if err := s.Serve(lis); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am missing something, grpc server is started for leader and non-leader, what is preventing non-leader to serve the user request?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the gRPC server is started on all instances (both leader and followers). However, this implementation follows a multi-replica active-standby (single-active) architecture:

  • Only the leader (active instance) serves user traffic — it’s the sole provider of business logic.
  • Followers are purely standby replicas — they remain running and ready to take over, but do not handle any client requests.

This behavior is enforced by Kubernetes through readiness probes and service endpoint:

The health server returns:

  • SERVING for readiness only on the leader.
  • NOT_SERVING for readiness on all followers.

Kubernetes marks followers as "not ready", so they are excluded from the service’s endpoint list.
As a result, the Service routes traffic exclusively to the active (leader) pod.

klog.Errorf("gRPC server error: %v", err)
}
}()

if err := s.Serve(lis); err != nil {
panic(err)
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

if enableLeaderElection {
// In leader election mode: wait for either signal or losing leadership
select {
case sig := <-signalCh:
klog.Warningf("signal received: %v, initiating graceful shutdown...", sig)
case <-leaderCtx.Done():
klog.Info("Leader context cancelled (lost leadership), initiating shutdown...")
}
gracefulShutdown()
os.Exit(0)
} else {
// In single instance mode: wait for shutdown signal
sig := <-signalCh
klog.Warningf("signal received: %v, initiating graceful shutdown...", sig)
gracefulShutdown()
os.Exit(0)
}
}
6 changes: 6 additions & 0 deletions config/gateway/gateway-plugin/gateway-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ spec:
command: ['sh', '-c', 'until echo "ping" | nc aibrix-redis-master 6379 -w 1 | grep -c PONG; do echo waiting for service aibrix-redis-master; sleep 2; done']
containers:
- name: gateway-plugin
args:
- --enable-leader-election=false
#- --leader-election-id=gateway-plugin-lock
#- --leader-election-namespace=aibrix-system
image: gateway-plugins:latest
imagePullPolicy: IfNotPresent
ports:
Expand Down Expand Up @@ -120,11 +124,13 @@ spec:
livenessProbe:
grpc:
port: 50052
service: liveness
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
grpc:
port: 50052
service: readiness
initialDelaySeconds: 5
periodSeconds: 10
serviceAccountName: aibrix-gateway-plugins
Expand Down
12 changes: 12 additions & 0 deletions config/rbac/gateway/role_gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ rules:
- patch
- update
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- gateway.networking.k8s.io
resources:
Expand Down
4 changes: 4 additions & 0 deletions dist/chart/templates/gateway-plugin/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ spec:
- name: gateway-plugin
image: {{ .Values.gatewayPlugin.container.image.repository }}:{{ .Values.gatewayPlugin.container.image.tag }}
imagePullPolicy: {{ .Values.gatewayPlugin.container.image.imagePullPolicy | default "IfNotPresent" }}
#args:
# - --enable-leader-election=false
# - --leader-election-id=gateway-plugin-lock
# - --leader-election-namespace=aibrix-system
ports:
- name: gateway
containerPort: 50052
Expand Down
12 changes: 12 additions & 0 deletions dist/chart/templates/gateway-plugin/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ rules:
- patch
- update
- watch
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- model.aibrix.ai
resources:
Expand Down
18 changes: 10 additions & 8 deletions dist/chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,19 @@ gatewayPlugin:
liveness:
grpc:
port: 50052
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
service: liveness
initialDelaySeconds: 60
periodSeconds: 15
timeoutSeconds: 10
failureThreshold: 8
readiness:
grpc:
port: 50052
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
service: readiness
initialDelaySeconds: 60
periodSeconds: 15
timeoutSeconds: 10
failureThreshold: 8
envs:
AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS: "50"
AIBRIX_PREFIX_CACHE_TOKENIZER_TYPE: "character"
Expand Down
Loading
Loading