Skip to content

Commit

Permalink
Addl changes after e2e testing
Browse files Browse the repository at this point in the history
Signed-off-by: Daneyon Hansen <[email protected]>
  • Loading branch information
danehans committed Mar 4, 2025
1 parent 0458f97 commit c5c1063
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 95 deletions.
44 changes: 44 additions & 0 deletions install/helm/kgateway/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,47 @@ rules:
- get
- list
- watch
- apiGroups:
- inference.networking.x-k8s.io
resources:
- inferencemodels
verbs:
- get
- list
- watch
- apiGroups:
- inference.networking.x-k8s.io
resources:
- inferencepools
verbs:
- get
- list
- watch
- update
- apiGroups:
- rbac.authorization.k8s.io
# TODO [danehans]: EPP should use Role and RoleBinding resources: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/224
resources:
- clusterroles
- clusterrolebindings
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
# TODO [danehans]: Unsure why the following rules are needed: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/224
- apiGroups:
- authentication.k8s.io
resources:
- tokenreviews
verbs:
- create
- apiGroups:
- authorization.k8s.io
resources:
- subjectaccessreviews
verbs:
- create
30 changes: 27 additions & 3 deletions internal/kgateway/controller/inferencepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/deployer"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
)

type inferencePoolReconciler struct {
Expand All @@ -29,11 +30,23 @@ func (r *inferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

if pool.GetDeletionTimestamp() != nil {
// no need to do anything as we have owner refs, so children will be deleted
log.Info("inferencepool deleted, no need for reconciling")
log.Info("Removing endpoint picker for InferencePool", "name", pool.Name, "namespace", pool.Namespace)
if err := r.deployer.CleanupClusterScopedResources(ctx, pool); err != nil {
return ctrl.Result{}, err
}
// Remove the finalizer.
pool.Finalizers = removeString(pool.Finalizers, wellknown.InferencePoolFinalizer)
if err := r.cli.Update(ctx, pool); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

// Ensure the finalizer is present for the InferencePool.
if err := r.deployer.EnsureFinalizer(ctx, pool); err != nil {
return ctrl.Result{}, err
}

// Use the registered index to list HTTPRoutes that reference this pool.
var routeList gwv1.HTTPRouteList
if err := r.cli.List(ctx, &routeList,
Expand All @@ -58,7 +71,7 @@ func (r *inferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// TODO [danehans]: Manage inferencepool status conditions.

// Deploy the endpoint picker resources.
log.Info("Deploying endpoint picker from InferencePool", "name", pool.Name, "namespace", pool.Namespace)
log.Info("Deploying endpoint picker for InferencePool", "name", pool.Name, "namespace", pool.Namespace)
err = r.deployer.DeployObjs(ctx, objs)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -68,3 +81,14 @@ func (r *inferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques

return ctrl.Result{}, nil
}

// removeString is a helper function to remove a string from a slice.
func removeString(slice []string, s string) []string {
var result []string
for _, item := range slice {
if item != s {
result = append(result, item)
}
}
return result
}
12 changes: 9 additions & 3 deletions internal/kgateway/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils/krtutil"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/client/clientset/versioned"
glooschemes "github.com/kgateway-dev/kgateway/v2/pkg/schemes"
kgtwschemes "github.com/kgateway-dev/kgateway/v2/pkg/schemes"
"github.com/kgateway-dev/kgateway/v2/pkg/utils/kubeutils"
"github.com/kgateway-dev/kgateway/v2/pkg/utils/namespaces"
)
Expand Down Expand Up @@ -102,7 +102,7 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
scheme := DefaultScheme()

// Extend the scheme if the TCPRoute CRD exists.
if err := glooschemes.AddGatewayV1A2Scheme(cfg.RestConfig, scheme); err != nil {
if err := kgtwschemes.AddGatewayV1A2Scheme(cfg.RestConfig, scheme); err != nil {
return nil, err
}

Expand Down Expand Up @@ -146,11 +146,17 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
)

// Extend the scheme and add the EPP plugin if the InferencePool CRD exists.
exists, err := glooschemes.AddInferExtV1A1Scheme(cfg.RestConfig, scheme)
exists, err := kgtwschemes.AddInferExtV1A1Scheme(cfg.RestConfig, scheme)
setupLog.Info("checking inference extension CRDs exist", "result", exists)

switch {
case err != nil:
return nil, err
case exists:
setupLog.Info("adding inference extension endpoint picker plugin")
if cfg.ExtraPlugins == nil {
cfg.ExtraPlugins = []extensionsplug.Plugin{}
}
cfg.ExtraPlugins = append(cfg.ExtraPlugins, endpointpicker.NewPlugin(ctx, commoncol))
}

Expand Down
73 changes: 61 additions & 12 deletions internal/kgateway/deployer/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"io"
"io/fs"
"path/filepath"
"slices"

"github.com/rotisserie/eris"
"golang.org/x/exp/slices"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/storage"
"helm.sh/helm/v3/pkg/storage/driver"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -516,19 +517,26 @@ func (d *Deployer) GetEndpointPickerObjs(pool *infextv1a1.InferencePool) ([]clie
return nil, fmt.Errorf("failed to render inference extension objects: %w", err)
}

// Ensure that each rendered object has its namespace set.
// Ensure that each namespaced rendered object has its namespace and ownerRef set.
for _, obj := range objs {
if obj.GetNamespace() == "" {
obj.SetNamespace(pool.Namespace)
gvk := obj.GetObjectKind().GroupVersionKind()
if IsNamespaced(gvk) {
if obj.GetNamespace() == "" {
obj.SetNamespace(pool.Namespace)
}
obj.SetOwnerReferences([]metav1.OwnerReference{{
APIVersion: pool.APIVersion,
Kind: pool.Kind,
Name: pool.Name,
UID: pool.UID,
Controller: ptr.To(true),
}})
} else {
// TODO [danehans]: Not sure why a ns must be set for cluster-scoped objects:
// failed to apply object rbac.authorization.k8s.io/v1, Kind=ClusterRoleBinding
// vllm-llama2-7b-pool-endpoint-picker: Namespace parameter required.
obj.SetNamespace("")
}
// Set owner references so that these objects are tied to the InferencePool.
obj.SetOwnerReferences([]metav1.OwnerReference{{
APIVersion: pool.APIVersion,
Kind: pool.Kind,
Name: pool.Name,
UID: pool.UID,
Controller: ptr.To(true),
}})
}

return objs, nil
Expand All @@ -545,6 +553,47 @@ func (d *Deployer) DeployObjs(ctx context.Context, objs []client.Object) error {
return nil
}

// EnsureFinalizer adds the InferencePool finalizer to the given pool if it’s not already present.
func (d *Deployer) EnsureFinalizer(ctx context.Context, pool *infextv1a1.InferencePool) error {
if slices.Contains(pool.Finalizers, wellknown.InferencePoolFinalizer) {
return nil
}
pool.Finalizers = append(pool.Finalizers, wellknown.InferencePoolFinalizer)
return d.cli.Update(ctx, pool)
}

// CleanupClusterScopedResources deletes the ClusterRole and ClusterRoleBinding for the given pool.
func (d *Deployer) CleanupClusterScopedResources(ctx context.Context, pool *infextv1a1.InferencePool) error {
// The same release name as in the Helm template.
releaseName := fmt.Sprintf("%s-endpoint-picker", pool.Name)

// Delete the ClusterRole.
var cr rbacv1.ClusterRole
if err := d.cli.Get(ctx, client.ObjectKey{Name: releaseName}, &cr); err == nil {
if err := d.cli.Delete(ctx, &cr); err != nil {
return fmt.Errorf("failed to delete ClusterRole %s: %w", releaseName, err)
}
}

// Delete the ClusterRoleBinding.
var crb rbacv1.ClusterRoleBinding
if err := d.cli.Get(ctx, client.ObjectKey{Name: releaseName}, &crb); err == nil {
if err := d.cli.Delete(ctx, &crb); err != nil {
return fmt.Errorf("failed to delete ClusterRoleBinding %s: %w", releaseName, err)
}
}

return nil
}

// IsNamespaced returns true if the resource is namespaced.
func IsNamespaced(gvk schema.GroupVersionKind) bool {
if gvk == wellknown.ClusterRoleGVK || gvk == wellknown.ClusterRoleBindingGVK {
return false
}
return true
}

func loadFs(filesystem fs.FS) (*chart.Chart, error) {
var bufferedFiles []*loader.BufferedFile
entries, err := fs.ReadDir(filesystem, ".")
Expand Down
46 changes: 28 additions & 18 deletions internal/kgateway/deployer/deployer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,13 @@ var _ = Describe("Deployer", func() {
})
Expect(err).NotTo(HaveOccurred())

// Simulate reconciliation so that the pool gets its finalizer added.
err = d.EnsureFinalizer(context.Background(), pool)
Expect(err).NotTo(HaveOccurred())

// Check that the pool itself has the finalizer set.
Expect(pool.GetFinalizers()).To(ContainElement(wellknown.InferencePoolFinalizer))

// Get the endpoint picker objects for the InferencePool.
objs, err := d.GetEndpointPickerObjs(pool)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -1474,48 +1481,51 @@ var _ = Describe("Deployer", func() {

// Find the child objects.
var sa *corev1.ServiceAccount
var role *rbacv1.Role
var rb *rbacv1.RoleBinding
var clusterRole *rbacv1.ClusterRole
var crb *rbacv1.ClusterRoleBinding
var dep *appsv1.Deployment
var svc *corev1.Service
for _, obj := range objs {
switch t := obj.(type) {
case *corev1.ServiceAccount:
sa = t
case *rbacv1.Role:
role = t
case *rbacv1.RoleBinding:
rb = t
case *rbacv1.ClusterRole:
clusterRole = t
case *rbacv1.ClusterRoleBinding:
crb = t
case *appsv1.Deployment:
dep = t
case *corev1.Service:
svc = t
}
}
Expect(sa).NotTo(BeNil(), "expected a ServiceAccount to be rendered")
Expect(role).NotTo(BeNil(), "expected a Role to be rendered")
Expect(rb).NotTo(BeNil(), "expected a RoleBinding to be rendered")
Expect(clusterRole).NotTo(BeNil(), "expected a Role to be rendered")
Expect(crb).NotTo(BeNil(), "expected a RoleBinding to be rendered")
Expect(dep).NotTo(BeNil(), "expected a Deployment to be rendered")
Expect(svc).NotTo(BeNil(), "expected a Service to be rendered")

// Check that owner references are set on all rendered objects to the InferencePool.
for _, obj := range objs {
ownerRefs := obj.GetOwnerReferences()
Expect(ownerRefs).To(HaveLen(1))
ref := ownerRefs[0]
Expect(ref.Name).To(Equal(pool.Name))
Expect(ref.UID).To(Equal(pool.UID))
Expect(ref.Kind).To(Equal(pool.Kind))
Expect(ref.APIVersion).To(Equal(pool.APIVersion))
Expect(*ref.Controller).To(BeTrue())
gvk := obj.GetObjectKind().GroupVersionKind()
if deployer.IsNamespaced(gvk) {
ownerRefs := obj.GetOwnerReferences()
Expect(ownerRefs).To(HaveLen(1))
ref := ownerRefs[0]
Expect(ref.Name).To(Equal(pool.Name))
Expect(ref.UID).To(Equal(pool.UID))
Expect(ref.Kind).To(Equal(pool.Kind))
Expect(ref.APIVersion).To(Equal(pool.APIVersion))
Expect(*ref.Controller).To(BeTrue())
}
}

// Validate that the rendered Deployment and Service have the expected names.
// (The template hardcodes the names to "inference-gateway-ext-proc".)
expectedName := fmt.Sprintf("%s-endpoint-picker", pool.Name)
Expect(sa.Name).To(Equal(expectedName))
Expect(role.Name).To(Equal(expectedName))
Expect(rb.Name).To(Equal(expectedName))
Expect(clusterRole.Name).To(Equal(expectedName))
Expect(crb.Name).To(Equal(expectedName))
Expect(dep.Name).To(Equal(expectedName))
Expect(svc.Name).To(Equal(expectedName))

Expand Down
Loading

0 comments on commit c5c1063

Please sign in to comment.