@@ -17,38 +17,56 @@ limitations under the License.
1717package main
1818
1919import (
20+ "context"
2021 "flag"
2122 "net"
2223 "net/http"
2324 "os"
2425 "os/signal"
26+ "sync/atomic"
2527 "syscall"
28+ "time"
2629
30+ extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2731 "google.golang.org/grpc"
32+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
33+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+ "k8s.io/apimachinery/pkg/util/uuid"
2835 "k8s.io/client-go/kubernetes"
2936 "k8s.io/client-go/rest"
3037 "k8s.io/client-go/tools/clientcmd"
38+ "k8s.io/client-go/tools/leaderelection"
39+ "k8s.io/client-go/tools/leaderelection/resourcelock"
3140 "k8s.io/klog/v2"
41+ "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
3242
33- extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
3443 "github.com/vllm-project/aibrix/pkg/cache"
3544 "github.com/vllm-project/aibrix/pkg/constants"
3645 "github.com/vllm-project/aibrix/pkg/plugins/gateway"
3746 routing "github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms"
47+ healthserver "github.com/vllm-project/aibrix/pkg/plugins/gateway/health"
3848 "github.com/vllm-project/aibrix/pkg/utils"
39- "google.golang.org/grpc/health"
40- healthPb "google.golang.org/grpc/health/grpc_health_v1"
41- "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
4249)
4350
4451var (
45- grpcAddr string
46- metricsAddr string
52+ grpcAddr string
53+ metricsAddr string
54+ profilingAddr string
55+ enableLeaderElection bool
56+ leaderElectionID string
57+ leaderElectionNamespace string
4758)
4859
4960func main () {
5061 flag .StringVar (& grpcAddr , "grpc-bind-address" , ":50052" , "The address the gRPC server binds to." )
5162 flag .StringVar (& metricsAddr , "metrics-bind-address" , ":8080" , "The address the metric endpoint binds to." )
63+ flag .StringVar (& profilingAddr , "profiling-bind-address" , ":6061" , "The address the profiling endpoint binds to." )
64+ flag .BoolVar (& enableLeaderElection , "enable-leader-election" , false , "Enable leader election for high availability" )
65+ flag .StringVar (& leaderElectionID ,
66+ "leader-election-id" , "gateway-plugin-lock" , "Name of the lease resource for leader election" )
67+ flag .StringVar (& leaderElectionNamespace , "leader-election-namespace" ,
68+ "aibrix-system" , "Namespace for leader election lease (default: same as pod)" )
69+
5270 klog .InitFlags (flag .CommandLine )
5371 defer klog .Flush ()
5472 flag .Parse ()
@@ -110,32 +128,160 @@ func main() {
110128 }
111129 klog .Infof ("Started metrics server on %s" , metricsAddr )
112130
131+ isLeader := & atomic.Bool {}
132+ isLeader .Store (false )
133+
134+ leaderCtx , leaderCancel := context .WithCancel (context .Background ())
135+ defer leaderCancel ()
136+
137+ if enableLeaderElection {
138+ klog .Info ("Leader election enabled" )
139+
140+ // Get pod info for lease
141+ podName := os .Getenv ("POD_NAME" )
142+ if podName == "" {
143+ podName = string (uuid .NewUUID ())
144+ }
145+ if leaderElectionNamespace == "" {
146+ podNamespace := os .Getenv ("POD_NAMESPACE" )
147+ if podNamespace == "" {
148+ // Read from file (in-cluster mode)
149+ nsBytes , err := os .ReadFile ("/var/run/secrets/kubernetes.io/serviceaccount/namespace" )
150+ if err != nil {
151+ klog .Fatalf ("Failed to read namespace from file: %v" , err )
152+ }
153+ podNamespace = string (nsBytes )
154+ }
155+ leaderElectionNamespace = podNamespace
156+ }
157+
158+ lock := & resourcelock.LeaseLock {
159+ LeaseMeta : metav1.ObjectMeta {
160+ Name : leaderElectionID ,
161+ Namespace : leaderElectionNamespace ,
162+ },
163+ Client : k8sClient .CoordinationV1 (),
164+ LockConfig : resourcelock.ResourceLockConfig {
165+ Identity : podName ,
166+ },
167+ }
168+
169+ leConfig := leaderelection.LeaderElectionConfig {
170+ Lock : lock ,
171+ LeaseDuration : 15 * time .Second ,
172+ RenewDeadline : 10 * time .Second ,
173+ RetryPeriod : 2 * time .Second ,
174+ Callbacks : leaderelection.LeaderCallbacks {
175+ OnStartedLeading : func (ctx context.Context ) {
176+ klog .Info ("This instance is now the leader!" )
177+ isLeader .Store (true )
178+ },
179+ OnStoppedLeading : func () {
180+ klog .Info ("This instance is no longer the leader, initiating graceful shutdown..." )
181+ // Cancel the leader context to stop leader-specific operations
182+ leaderCancel ()
183+ // Exit the process to let Kubernetes restart it
184+ os .Exit (0 )
185+ },
186+ OnNewLeader : func (identity string ) {
187+ if identity == podName {
188+ klog .Info ("Still the leader" )
189+ } else {
190+ klog .Infof ("New leader elected: %s" , identity )
191+ }
192+ },
193+ },
194+ ReleaseOnCancel : true ,
195+ }
196+
197+ leaderElector , err := leaderelection .NewLeaderElector (leConfig )
198+ if err != nil {
199+ klog .Fatalf ("Failed to create leader elector: %v" , err )
200+ }
201+
202+ leaderElector , err = leaderelection .NewLeaderElector (leConfig )
203+ if err != nil {
204+ klog .Fatalf ("Failed to create leader elector: %v" , err )
205+ }
206+
207+ go func () {
208+ leaderElector .Run (leaderCtx )
209+ }()
210+ } else {
211+ // Single instance mode, all instances are leaders
212+ isLeader .Store (true )
213+ klog .Info ("Single instance mode enabled, this instance is always the leader" )
214+ }
215+
216+ // Setup gRPC server with custom health server
113217 s := grpc .NewServer ()
114218 extProcPb .RegisterExternalProcessorServer (s , gatewayServer )
115219
116- healthCheck := health .NewServer ()
117- healthPb .RegisterHealthServer (s , healthCheck )
118- healthCheck .SetServingStatus ("gateway-plugin" , healthPb .HealthCheckResponse_SERVING )
220+ newHealthServer := healthserver .NewHealthServer (isLeader , enableLeaderElection )
221+ healthpb .RegisterHealthServer (s , newHealthServer )
119222
120223 klog .Info ("starting gRPC server on " + grpcAddr )
121224
225+ profilingServer := & http.Server {
226+ Addr : profilingAddr ,
227+ }
122228 go func () {
123- if err := http .ListenAndServe ("localhost:6060" , nil ); err != nil {
124- klog .Fatalf ("failed to setup profiling: %v" , err )
229+ if err := profilingServer .ListenAndServe (); err != nil && err != http . ErrServerClosed {
230+ klog .Fatalf ("failed to setup profiling on %s : %v" , profilingAddr , err )
125231 }
126232 }()
127233
128- var gracefulStop = make (chan os.Signal , 1 )
129- signal .Notify (gracefulStop , syscall .SIGINT , syscall .SIGTERM )
130- go func () {
131- sig := <- gracefulStop
132- klog .Warningf ("signal received: %v, initiating graceful shutdown..." , sig )
133- gatewayServer .Shutdown ()
234+ // Create graceful shutdown function
235+ gracefulShutdown := func () {
236+ klog .Info ("Initiating graceful shutdown..." )
237+
134238 s .GracefulStop ()
135- os .Exit (0 )
239+ klog .Info ("gRPC server stopped" )
240+
241+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
242+ defer cancel ()
243+ if err := profilingServer .Shutdown (ctx ); err != nil {
244+ klog .Errorf ("Error shutting down profiling server: %v" , err )
245+ }
246+ klog .Info ("Profiling server stopped" )
247+
248+ gatewayServer .Shutdown ()
249+ klog .Info ("Gateway server stopped" )
250+
251+ if err := redisClient .Close (); err != nil {
252+ klog .Warningf ("Error closing Redis client during shutdown: %v" , err )
253+ }
254+ klog .Info ("Redis client closed" )
255+
256+ leaderCancel ()
257+ klog .Info ("Leader context cancelled" )
258+ klog .Info ("Graceful shutdown completed" )
259+ }
260+
261+ go func () {
262+ if err := s .Serve (lis ); err != nil {
263+ klog .Errorf ("gRPC server error: %v" , err )
264+ }
136265 }()
137266
138- if err := s .Serve (lis ); err != nil {
139- panic (err )
267+ signalCh := make (chan os.Signal , 1 )
268+ signal .Notify (signalCh , syscall .SIGINT , syscall .SIGTERM )
269+
270+ if enableLeaderElection {
271+ // In leader election mode: wait for either signal or losing leadership
272+ select {
273+ case sig := <- signalCh :
274+ klog .Warningf ("signal received: %v, initiating graceful shutdown..." , sig )
275+ case <- leaderCtx .Done ():
276+ klog .Info ("Leader context cancelled (lost leadership), initiating shutdown..." )
277+ }
278+ gracefulShutdown ()
279+ os .Exit (0 )
280+ } else {
281+ // In single instance mode: wait for shutdown signal
282+ sig := <- signalCh
283+ klog .Warningf ("signal received: %v, initiating graceful shutdown..." , sig )
284+ gracefulShutdown ()
285+ os .Exit (0 )
140286 }
141287}
0 commit comments