Skip to content

Commit 243dbc7

Browse files
authored
Fix gateway and router issues (#141)
Signed-off-by: Rita Zhang <rita.z.zhang@gmail.com>
1 parent 9261299 commit 243dbc7

9 files changed

Lines changed: 266 additions & 94 deletions

File tree

.github/workflows/e2e-gateway.yml

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -232,23 +232,6 @@ jobs:
232232
sleep 10
233233
done
234234
235-
- name: Configure Istio DestinationRule for EPP
236-
run: |
237-
kubectl apply -f - <<'DREOF'
238-
apiVersion: networking.istio.io/v1beta1
239-
kind: DestinationRule
240-
metadata:
241-
name: llama-gw-e2e-epp
242-
namespace: default
243-
spec:
244-
host: llama-gw-e2e-epp.default.svc.cluster.local
245-
trafficPolicy:
246-
tls:
247-
mode: SIMPLE
248-
insecureSkipVerify: true
249-
DREOF
250-
echo "✅ Istio DestinationRule created for EPP"
251-
252235
- name: Install Body-Based Router (BBR)
253236
run: |
254237
helm install body-based-router \

backend/src/services/kubernetes.ts

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,27 +1392,6 @@ class KubernetesService {
13921392
return { available: false };
13931393
}
13941394

1395-
// List InferencePool resources across all namespaces
1396-
let poolCount = 0;
1397-
try {
1398-
const response = await withRetry(
1399-
() => this.customObjectsApi.listClusterCustomObject(
1400-
'inference.networking.k8s.io',
1401-
'v1',
1402-
'inferencepools'
1403-
),
1404-
{ operationName: 'listInferencePools', maxRetries: 1 }
1405-
);
1406-
const items = (response.body as { items?: unknown[] }).items || [];
1407-
poolCount = items.length;
1408-
} catch (error: any) {
1409-
logger.debug({ error: error?.message }, 'Could not list InferencePool resources');
1410-
}
1411-
1412-
if (poolCount === 0) {
1413-
return { available: false };
1414-
}
1415-
14161395
// Try to find a Gateway endpoint
14171396
let endpoint: string | undefined;
14181397
const gatewayCrdExists = await this.checkCRDExists('gateways.gateway.networking.k8s.io');

controller/api/v1alpha1/modeldeployment_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ const (
564564
)
565565

566566
const (
567+
HTTPRouteCreated = "kubeairunway.ai/httproute-created"
567568
LabelModelDeployment = "kubeairunway.ai/model-deployment"
568569
LabelManagedBy = "kubeairunway.ai/managed-by"
569570
LabelJobType = "kubeairunway.ai/job-type"

controller/config/rbac/role.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,18 @@ rules:
126126
- get
127127
- patch
128128
- update
129+
- apiGroups:
130+
- networking.istio.io
131+
resources:
132+
- destinationrules
133+
verbs:
134+
- create
135+
- delete
136+
- get
137+
- list
138+
- patch
139+
- update
140+
- watch
129141
- apiGroups:
130142
- rbac.authorization.k8s.io
131143
resources:

controller/internal/controller/gateway_reconciler.go

Lines changed: 201 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,14 @@ import (
2727
appsv1 "k8s.io/api/apps/v1"
2828
corev1 "k8s.io/api/core/v1"
2929
rbacv1 "k8s.io/api/rbac/v1"
30+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
33+
"k8s.io/apimachinery/pkg/runtime/schema"
34+
"k8s.io/apimachinery/pkg/types"
3135
ctrl "sigs.k8s.io/controller-runtime"
3236
"sigs.k8s.io/controller-runtime/pkg/client"
37+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3338
"sigs.k8s.io/controller-runtime/pkg/log"
3439

3540
kubeairunwayv1alpha1 "github.com/kaito-project/kubeairunway/controller/api/v1alpha1"
@@ -206,6 +211,15 @@ func (r *ModelDeploymentReconciler) reconcileInferencePool(ctx context.Context,
206211
}
207212

208213
log.FromContext(ctx).V(1).Info("InferencePool reconciled", "name", pool.Name, "result", result)
214+
215+
// When a new InferencePool is created, restart the BBR deployment (if present) so it
216+
// discovers the new model. BBR watches ConfigMaps via controller-runtime and rebuilds
217+
// its internal model registry on startup.
218+
if result == controllerutil.OperationResultCreated {
219+
if err := r.restartBBRIfPresent(ctx, md.Namespace); err != nil {
220+
log.FromContext(ctx).Info("Could not restart BBR deployment (non-fatal)", "error", err)
221+
}
222+
}
209223
return nil
210224
}
211225

@@ -363,12 +377,14 @@ kind: EndpointPickerConfig
363377
},
364378
LivenessProbe: &corev1.Probe{
365379
ProbeHandler: corev1.ProbeHandler{GRPC: &corev1.GRPCAction{Port: 9003, Service: strPtr("inference-extension")}},
366-
InitialDelaySeconds: 5,
380+
InitialDelaySeconds: 30,
367381
PeriodSeconds: 10,
382+
FailureThreshold: 5,
368383
},
369384
ReadinessProbe: &corev1.Probe{
370-
ProbeHandler: corev1.ProbeHandler{GRPC: &corev1.GRPCAction{Port: 9003, Service: strPtr("inference-extension")}},
371-
PeriodSeconds: 2,
385+
ProbeHandler: corev1.ProbeHandler{GRPC: &corev1.GRPCAction{Port: 9003, Service: strPtr("inference-extension")}},
386+
InitialDelaySeconds: 10,
387+
PeriodSeconds: 5,
372388
},
373389
VolumeMounts: []corev1.VolumeMount{
374390
{Name: "plugins-config", MountPath: "/config"},
@@ -414,31 +430,73 @@ kind: EndpointPickerConfig
414430
return fmt.Errorf("failed to create/update EPP Service: %w", err)
415431
}
416432

433+
if err := r.reconcileEPPDestinationRule(ctx, md, eppName); err != nil {
434+
return fmt.Errorf("failed to create/update EPP DestinationRule: %w", err)
435+
}
436+
417437
log.FromContext(ctx).V(1).Info("EPP reconciled", "name", eppName, "image", eppImage)
418438
return nil
419439
}
420440

441+
// reconcileEPPDestinationRule creates or updates the Istio DestinationRule for the EPP service,
442+
// but only if Istio is detected (i.e. the DestinationRule CRD is registered in the cluster).
443+
// DestinationRule: tell Istio to use SIMPLE TLS (insecureSkipVerify)
444+
// to skip cert validation.
445+
func (r *ModelDeploymentReconciler) reconcileEPPDestinationRule(ctx context.Context, md *kubeairunwayv1alpha1.ModelDeployment, eppName string) error {
446+
gk := schema.GroupKind{Group: "networking.istio.io", Kind: "DestinationRule"}
447+
if _, err := r.Client.RESTMapper().RESTMapping(gk); err != nil {
448+
log.FromContext(ctx).V(1).Info("Istio not detected, skipping DestinationRule", "eppName", eppName)
449+
return nil
450+
}
451+
452+
dr := &unstructured.Unstructured{}
453+
dr.SetGroupVersionKind(schema.GroupVersionKind{
454+
Group: "networking.istio.io",
455+
Version: "v1beta1",
456+
Kind: "DestinationRule",
457+
})
458+
dr.SetName(eppName)
459+
dr.SetNamespace(md.Namespace)
460+
461+
_, err := ctrl.CreateOrUpdate(ctx, r.Client, dr, func() error {
462+
if err := unstructured.SetNestedField(dr.Object, map[string]interface{}{
463+
"host": fmt.Sprintf("%s.%s.svc.cluster.local", eppName, md.Namespace),
464+
"trafficPolicy": map[string]interface{}{
465+
"tls": map[string]interface{}{
466+
"mode": "SIMPLE",
467+
"insecureSkipVerify": true,
468+
},
469+
},
470+
}, "spec"); err != nil {
471+
return err
472+
}
473+
return ctrl.SetControllerReference(md, dr, r.Scheme)
474+
})
475+
return err
476+
}
477+
421478
func int64Ptr(i int64) *int64 { return &i }
422479
func strPtr(s string) *string { return &s }
423480

424-
// reconcileHTTPRoute creates or updates the HTTPRoute for a ModelDeployment.
481+
// reconcileHTTPRoute creates the HTTPRoute for a ModelDeployment on first reconcile.
482+
// If the HTTPRoute is subsequently deleted by the user the controller will not recreate.
483+
// The deletion is treated as intentional (BYO / opt-out). The ModelDeployment is
484+
// annotated with HTTPRouteCreated after the initial creation so that future
485+
// reconciles will skip recreating a missing route.
425486
func (r *ModelDeploymentReconciler) reconcileHTTPRoute(ctx context.Context, md *kubeairunwayv1alpha1.ModelDeployment, gwConfig *gateway.GatewayConfig, modelName string) error {
426-
route := &gatewayv1.HTTPRoute{
427-
ObjectMeta: metav1.ObjectMeta{
428-
Name: md.Name,
429-
Namespace: md.Namespace,
430-
},
431-
}
432-
433-
group := gatewayv1.Group("inference.networking.k8s.io")
434-
kind := gatewayv1.Kind("InferencePool")
435-
ns := gatewayv1.Namespace(gwConfig.GatewayNamespace)
487+
logger := log.FromContext(ctx)
436488

437-
result, err := ctrl.CreateOrUpdate(ctx, r.Client, route, func() error {
489+
existing := &gatewayv1.HTTPRoute{}
490+
err := r.Get(ctx, client.ObjectKey{Name: md.Name, Namespace: md.Namespace}, existing)
491+
if err == nil {
492+
// HTTPRoute exists — update it in case model name or gateway changed.
493+
group := gatewayv1.Group("inference.networking.k8s.io")
494+
kind := gatewayv1.Kind("InferencePool")
495+
ns := gatewayv1.Namespace(gwConfig.GatewayNamespace)
438496
pathPrefix := gatewayv1.PathMatchPathPrefix
439497
headerExact := gatewayv1.HeaderMatchExact
440498
timeout := gatewayv1.Duration("300s")
441-
route.Spec = gatewayv1.HTTPRouteSpec{
499+
existing.Spec = gatewayv1.HTTPRouteSpec{
442500
CommonRouteSpec: gatewayv1.CommonRouteSpec{
443501
ParentRefs: []gatewayv1.ParentReference{
444502
{
@@ -458,17 +516,11 @@ func (r *ModelDeploymentReconciler) reconcileHTTPRoute(ctx context.Context, md *
458516
Headers: []gatewayv1.HTTPHeaderMatch{
459517
{
460518
Type: &headerExact,
461-
Name: "X-Gateway-Base-Model-Name",
519+
Name: "X-Gateway-Model-Name", // https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/pkg/bbr/README.md
462520
Value: modelName,
463521
},
464522
},
465523
},
466-
{
467-
Path: &gatewayv1.HTTPPathMatch{
468-
Type: &pathPrefix,
469-
Value: strPtr("/"),
470-
},
471-
},
472524
},
473525
BackendRefs: []gatewayv1.HTTPBackendRef{
474526
{
@@ -487,14 +539,97 @@ func (r *ModelDeploymentReconciler) reconcileHTTPRoute(ctx context.Context, md *
487539
},
488540
},
489541
}
490-
return ctrl.SetControllerReference(md, route, r.Scheme)
491-
})
492-
if err != nil {
493-
return fmt.Errorf("failed to create/update HTTPRoute: %w", err)
542+
if updateErr := r.Update(ctx, existing); updateErr != nil {
543+
return fmt.Errorf("failed to update HTTPRoute: %w", updateErr)
544+
}
545+
logger.V(1).Info("HTTPRoute updated", "name", existing.Name)
546+
return nil
494547
}
548+
if apierrors.IsNotFound(err) {
549+
// HTTPRoute is missing. If we created one previously the user deleted it
550+
// intentionally — respect that and do not recreate.
551+
if md.Annotations[kubeairunwayv1alpha1.HTTPRouteCreated] == "true" {
552+
logger.V(1).Info("HTTPRoute was deleted by user, skipping recreation", "name", md.Name)
553+
return nil
554+
}
495555

496-
log.FromContext(ctx).V(1).Info("HTTPRoute reconciled", "name", route.Name, "result", result)
497-
return nil
556+
// First-time creation.
557+
group := gatewayv1.Group("inference.networking.k8s.io")
558+
kind := gatewayv1.Kind("InferencePool")
559+
ns := gatewayv1.Namespace(gwConfig.GatewayNamespace)
560+
pathPrefix := gatewayv1.PathMatchPathPrefix
561+
headerExact := gatewayv1.HeaderMatchExact
562+
timeout := gatewayv1.Duration("300s")
563+
route := &gatewayv1.HTTPRoute{
564+
ObjectMeta: metav1.ObjectMeta{
565+
Name: md.Name,
566+
Namespace: md.Namespace,
567+
},
568+
Spec: gatewayv1.HTTPRouteSpec{
569+
CommonRouteSpec: gatewayv1.CommonRouteSpec{
570+
ParentRefs: []gatewayv1.ParentReference{
571+
{
572+
Name: gatewayv1.ObjectName(gwConfig.GatewayName),
573+
Namespace: &ns,
574+
},
575+
},
576+
},
577+
Rules: []gatewayv1.HTTPRouteRule{
578+
{
579+
Matches: []gatewayv1.HTTPRouteMatch{
580+
{
581+
Path: &gatewayv1.HTTPPathMatch{
582+
Type: &pathPrefix,
583+
Value: strPtr("/"),
584+
},
585+
Headers: []gatewayv1.HTTPHeaderMatch{
586+
{
587+
Type: &headerExact,
588+
Name: "X-Gateway-Model-Name", // https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/pkg/bbr/README.md
589+
Value: modelName,
590+
},
591+
},
592+
},
593+
},
594+
BackendRefs: []gatewayv1.HTTPBackendRef{
595+
{
596+
BackendRef: gatewayv1.BackendRef{
597+
BackendObjectReference: gatewayv1.BackendObjectReference{
598+
Group: &group,
599+
Kind: &kind,
600+
Name: gatewayv1.ObjectName(md.Name),
601+
},
602+
},
603+
},
604+
},
605+
Timeouts: &gatewayv1.HTTPRouteTimeouts{
606+
Request: &timeout,
607+
},
608+
},
609+
},
610+
},
611+
}
612+
if setErr := ctrl.SetControllerReference(md, route, r.Scheme); setErr != nil {
613+
return fmt.Errorf("setting controller reference: %w", setErr)
614+
}
615+
if createErr := r.Create(ctx, route); createErr != nil {
616+
return fmt.Errorf("failed to create HTTPRoute: %w", createErr)
617+
}
618+
logger.Info("HTTPRoute created", "name", route.Name)
619+
620+
// Annotate the ModelDeployment so future reconciles know we created a route.
621+
patch := client.MergeFrom(md.DeepCopy())
622+
if md.Annotations == nil {
623+
md.Annotations = make(map[string]string)
624+
}
625+
md.Annotations[kubeairunwayv1alpha1.HTTPRouteCreated] = "true"
626+
if patchErr := r.Patch(ctx, md, patch); patchErr != nil {
627+
// Non-fatal: worst case we recreate the route once on the next reconcile.
628+
logger.V(1).Info("Could not annotate ModelDeployment after HTTPRoute creation", "error", patchErr)
629+
}
630+
return nil
631+
}
632+
return fmt.Errorf("getting HTTPRoute: %w", err)
498633
}
499634

500635
// resolveGatewayEndpoint reads the Gateway resource's status to find the actual endpoint address.
@@ -709,6 +844,15 @@ func (r *ModelDeploymentReconciler) cleanupGatewayResources(ctx context.Context,
709844
&rbacv1.Role{ObjectMeta: metav1.ObjectMeta{Name: eppName, Namespace: md.Namespace}},
710845
&corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: eppName, Namespace: md.Namespace}},
711846
}
847+
848+
// Conditionally delete the DestinationRule if Istio is present
849+
if _, err := r.Client.RESTMapper().RESTMapping(schema.GroupKind{Group: "networking.istio.io", Kind: "DestinationRule"}); err == nil {
850+
dr := &unstructured.Unstructured{}
851+
dr.SetGroupVersionKind(schema.GroupVersionKind{Group: "networking.istio.io", Version: "v1beta1", Kind: "DestinationRule"})
852+
dr.SetName(eppName)
853+
dr.SetNamespace(md.Namespace)
854+
eppResources = append(eppResources, dr)
855+
}
712856
for _, obj := range eppResources {
713857
if err := r.Delete(ctx, obj); client.IgnoreNotFound(err) != nil {
714858
logger.V(1).Info("Could not delete EPP resource", "resource", obj.GetObjectKind(), "error", err)
@@ -717,6 +861,34 @@ func (r *ModelDeploymentReconciler) cleanupGatewayResources(ctx context.Context,
717861

718862
md.Status.Gateway = nil
719863
r.setCondition(md, kubeairunwayv1alpha1.ConditionTypeGatewayReady, metav1.ConditionFalse, "GatewayDisabled", "Gateway resources cleaned up")
864+
865+
// Clear the httproute-created annotation so the controller will recreate the
866+
// HTTPRoute when the deployment recovers to Running. Without this, a transient
867+
// phase change (e.g. crash-loop) would permanently suppress HTTPRoute recreation.
868+
if md.Annotations[kubeairunwayv1alpha1.HTTPRouteCreated] == "true" {
869+
base := md.DeepCopy()
870+
delete(md.Annotations, kubeairunwayv1alpha1.HTTPRouteCreated)
871+
if err := r.Patch(ctx, md, client.MergeFrom(base)); err != nil {
872+
logger.V(1).Info("Could not clear httproute-created annotation during cleanup", "error", err)
873+
}
874+
}
875+
720876
logger.Info("Gateway resources cleaned up", "name", md.Name)
721877
return nil
722878
}
879+
880+
// restartBBRIfPresent triggers a rolling restart of the body-based-router Deployment (if present
881+
// in the given namespace) by updating its restart annotation. This is necessary because BBR builds
882+
// its internal model registry on startup and does not dynamically watch InferencePools.
883+
func (r *ModelDeploymentReconciler) restartBBRIfPresent(ctx context.Context, namespace string) error {
884+
var bbr appsv1.Deployment
885+
if err := r.Get(ctx, client.ObjectKey{Name: "body-based-router", Namespace: namespace}, &bbr); err != nil {
886+
return client.IgnoreNotFound(err)
887+
}
888+
patch := []byte(`{"spec":{"template":{"metadata":{"annotations":{"kubeairunway.ai/restartedAt":"` + time.Now().UTC().Format(time.RFC3339) + `"}}}}}`)
889+
if err := r.Patch(ctx, &bbr, client.RawPatch(types.StrategicMergePatchType, patch)); err != nil {
890+
return fmt.Errorf("patching body-based-router: %w", err)
891+
}
892+
log.FromContext(ctx).Info("Triggered BBR rolling restart to discover new InferencePool", "namespace", namespace)
893+
return nil
894+
}

0 commit comments

Comments
 (0)