Skip to content

Commit

Permalink
Addl changes after e2e testing
Browse files Browse the repository at this point in the history
Signed-off-by: Daneyon Hansen <[email protected]>
  • Loading branch information
danehans committed Mar 3, 2025
1 parent 0458f97 commit eda00c9
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 93 deletions.
44 changes: 44 additions & 0 deletions install/helm/kgateway/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,47 @@ rules:
- get
- list
- watch
- apiGroups:
- inference.networking.x-k8s.io
resources:
- inferencemodels
verbs:
- get
- list
- watch
- apiGroups:
- inference.networking.x-k8s.io
resources:
- inferencepools
verbs:
- get
- list
- watch
- update
- apiGroups:
- rbac.authorization.k8s.io
# TODO [danehans]: EPP should use Role and RoleBinding resources: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/224
resources:
- clusterroles
- clusterrolebindings
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
# TODO [danehans]: Unsure why the following rules are needed: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/224
- apiGroups:
- authentication.k8s.io
resources:
- tokenreviews
verbs:
- create
- apiGroups:
- authorization.k8s.io
resources:
- subjectaccessreviews
verbs:
- create
29 changes: 26 additions & 3 deletions internal/kgateway/controller/inferencepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/deployer"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
)

type inferencePoolReconciler struct {
Expand All @@ -29,11 +30,22 @@ func (r *inferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

if pool.GetDeletionTimestamp() != nil {
// no need to do anything as we have owner refs, so children will be deleted
log.Info("inferencepool deleted, no need for reconciling")
// Remove the cluster-scoped resources and finalizer.
if err := r.deployer.CleanupClusterScopedResources(ctx, pool); err != nil {
return ctrl.Result{}, err
}
pool.Finalizers = removeString(pool.Finalizers, wellknown.InferencePoolFinalizer)
if err := r.cli.Update(ctx, pool); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

// Ensure the finalizer is present for the InferencePool.
if err := r.deployer.EnsureFinalizer(ctx, pool); err != nil {
return ctrl.Result{}, err
}

// Use the registered index to list HTTPRoutes that reference this pool.
var routeList gwv1.HTTPRouteList
if err := r.cli.List(ctx, &routeList,
Expand All @@ -58,7 +70,7 @@ func (r *inferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// TODO [danehans]: Manage inferencepool status conditions.

// Deploy the endpoint picker resources.
log.Info("Deploying endpoint picker from InferencePool", "name", pool.Name, "namespace", pool.Namespace)
log.Info("Deploying endpoint picker for InferencePool", "name", pool.Name, "namespace", pool.Namespace)
err = r.deployer.DeployObjs(ctx, objs)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -68,3 +80,14 @@ func (r *inferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques

return ctrl.Result{}, nil
}

// removeString is a helper function to remove a string from a slice.
func removeString(slice []string, s string) []string {
var result []string
for _, item := range slice {
if item != s {
result = append(result, item)
}
}
return result
}
12 changes: 9 additions & 3 deletions internal/kgateway/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils/krtutil"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/client/clientset/versioned"
glooschemes "github.com/kgateway-dev/kgateway/v2/pkg/schemes"
kgtwschemes "github.com/kgateway-dev/kgateway/v2/pkg/schemes"
"github.com/kgateway-dev/kgateway/v2/pkg/utils/kubeutils"
"github.com/kgateway-dev/kgateway/v2/pkg/utils/namespaces"
)
Expand Down Expand Up @@ -102,7 +102,7 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
scheme := DefaultScheme()

// Extend the scheme if the TCPRoute CRD exists.
if err := glooschemes.AddGatewayV1A2Scheme(cfg.RestConfig, scheme); err != nil {
if err := kgtwschemes.AddGatewayV1A2Scheme(cfg.RestConfig, scheme); err != nil {
return nil, err
}

Expand Down Expand Up @@ -146,11 +146,17 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
)

// Extend the scheme and add the EPP plugin if the InferencePool CRD exists.
exists, err := glooschemes.AddInferExtV1A1Scheme(cfg.RestConfig, scheme)
exists, err := kgtwschemes.AddInferExtV1A1Scheme(cfg.RestConfig, scheme)
setupLog.Info("checking inference extension CRDs exist", "result", exists)

switch {
case err != nil:
return nil, err
case exists:
setupLog.Info("adding inference extension endpoint picker plugin")
if cfg.ExtraPlugins == nil {
cfg.ExtraPlugins = []extensionsplug.Plugin{}
}
cfg.ExtraPlugins = append(cfg.ExtraPlugins, endpointpicker.NewPlugin(ctx, commoncol))
}

Expand Down
73 changes: 61 additions & 12 deletions internal/kgateway/deployer/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"io"
"io/fs"
"path/filepath"
"slices"

"github.com/rotisserie/eris"
"golang.org/x/exp/slices"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/storage"
"helm.sh/helm/v3/pkg/storage/driver"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -516,19 +517,26 @@ func (d *Deployer) GetEndpointPickerObjs(pool *infextv1a1.InferencePool) ([]clie
return nil, fmt.Errorf("failed to render inference extension objects: %w", err)
}

// Ensure that each rendered object has its namespace set.
// Ensure that each namespaced rendered object has its namespace and ownerRef set.
for _, obj := range objs {
if obj.GetNamespace() == "" {
obj.SetNamespace(pool.Namespace)
gvk := obj.GetObjectKind().GroupVersionKind()
if IsNamespaced(gvk) {
if obj.GetNamespace() == "" {
obj.SetNamespace(pool.Namespace)
}
obj.SetOwnerReferences([]metav1.OwnerReference{{
APIVersion: pool.APIVersion,
Kind: pool.Kind,
Name: pool.Name,
UID: pool.UID,
Controller: ptr.To(true),
}})
} else {
// TODO [danehans]: Not sure why a ns must be set for cluster-scoped objects:
// failed to apply object rbac.authorization.k8s.io/v1, Kind=ClusterRoleBinding
// vllm-llama2-7b-pool-endpoint-picker: Namespace parameter required.
obj.SetNamespace("")
}
// Set owner references so that these objects are tied to the InferencePool.
obj.SetOwnerReferences([]metav1.OwnerReference{{
APIVersion: pool.APIVersion,
Kind: pool.Kind,
Name: pool.Name,
UID: pool.UID,
Controller: ptr.To(true),
}})
}

return objs, nil
Expand All @@ -545,6 +553,47 @@ func (d *Deployer) DeployObjs(ctx context.Context, objs []client.Object) error {
return nil
}

// EnsureFinalizer adds the InferencePool finalizer to the given pool if it’s not already present.
func (d *Deployer) EnsureFinalizer(ctx context.Context, pool *infextv1a1.InferencePool) error {
if slices.Contains(pool.Finalizers, wellknown.InferencePoolFinalizer) {
return nil
}
pool.Finalizers = append(pool.Finalizers, wellknown.InferencePoolFinalizer)
return d.cli.Update(ctx, pool)
}

// CleanupClusterScopedResources deletes the ClusterRole and ClusterRoleBinding for the given pool.
func (d *Deployer) CleanupClusterScopedResources(ctx context.Context, pool *infextv1a1.InferencePool) error {
// The same release name as in the Helm template.
releaseName := fmt.Sprintf("%s-endpoint-picker", pool.Name)

// Delete the ClusterRole.
var cr rbacv1.ClusterRole
if err := d.cli.Get(ctx, client.ObjectKey{Name: releaseName}, &cr); err == nil {
if err := d.cli.Delete(ctx, &cr); err != nil {
return fmt.Errorf("failed to delete ClusterRole %s: %w", releaseName, err)
}
}

// Delete the ClusterRoleBinding.
var crb rbacv1.ClusterRoleBinding
if err := d.cli.Get(ctx, client.ObjectKey{Name: releaseName}, &crb); err == nil {
if err := d.cli.Delete(ctx, &crb); err != nil {
return fmt.Errorf("failed to delete ClusterRoleBinding %s: %w", releaseName, err)
}
}

return nil
}

// IsNamespaced returns true if the resource is namespaced.
func IsNamespaced(gvk schema.GroupVersionKind) bool {
if gvk == wellknown.ClusterRoleGVK || gvk == wellknown.ClusterRoleBindingGVK {
return false
}
return true
}

func loadFs(filesystem fs.FS) (*chart.Chart, error) {
var bufferedFiles []*loader.BufferedFile
entries, err := fs.ReadDir(filesystem, ".")
Expand Down
46 changes: 28 additions & 18 deletions internal/kgateway/deployer/deployer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,13 @@ var _ = Describe("Deployer", func() {
})
Expect(err).NotTo(HaveOccurred())

// Simulate reconciliation so that the pool gets its finalizer added.
err = d.EnsureFinalizer(context.Background(), pool)
Expect(err).NotTo(HaveOccurred())

// Check that the pool itself has the finalizer set.
Expect(pool.GetFinalizers()).To(ContainElement(wellknown.InferencePoolFinalizer))

// Get the endpoint picker objects for the InferencePool.
objs, err := d.GetEndpointPickerObjs(pool)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -1474,48 +1481,51 @@ var _ = Describe("Deployer", func() {

// Find the child objects.
var sa *corev1.ServiceAccount
var role *rbacv1.Role
var rb *rbacv1.RoleBinding
var clusterRole *rbacv1.ClusterRole
var crb *rbacv1.ClusterRoleBinding
var dep *appsv1.Deployment
var svc *corev1.Service
for _, obj := range objs {
switch t := obj.(type) {
case *corev1.ServiceAccount:
sa = t
case *rbacv1.Role:
role = t
case *rbacv1.RoleBinding:
rb = t
case *rbacv1.ClusterRole:
clusterRole = t
case *rbacv1.ClusterRoleBinding:
crb = t
case *appsv1.Deployment:
dep = t
case *corev1.Service:
svc = t
}
}
Expect(sa).NotTo(BeNil(), "expected a ServiceAccount to be rendered")
Expect(role).NotTo(BeNil(), "expected a Role to be rendered")
Expect(rb).NotTo(BeNil(), "expected a RoleBinding to be rendered")
Expect(clusterRole).NotTo(BeNil(), "expected a Role to be rendered")
Expect(crb).NotTo(BeNil(), "expected a RoleBinding to be rendered")
Expect(dep).NotTo(BeNil(), "expected a Deployment to be rendered")
Expect(svc).NotTo(BeNil(), "expected a Service to be rendered")

// Check that owner references are set on all rendered objects to the InferencePool.
for _, obj := range objs {
ownerRefs := obj.GetOwnerReferences()
Expect(ownerRefs).To(HaveLen(1))
ref := ownerRefs[0]
Expect(ref.Name).To(Equal(pool.Name))
Expect(ref.UID).To(Equal(pool.UID))
Expect(ref.Kind).To(Equal(pool.Kind))
Expect(ref.APIVersion).To(Equal(pool.APIVersion))
Expect(*ref.Controller).To(BeTrue())
gvk := obj.GetObjectKind().GroupVersionKind()
if deployer.IsNamespaced(gvk) {
ownerRefs := obj.GetOwnerReferences()
Expect(ownerRefs).To(HaveLen(1))
ref := ownerRefs[0]
Expect(ref.Name).To(Equal(pool.Name))
Expect(ref.UID).To(Equal(pool.UID))
Expect(ref.Kind).To(Equal(pool.Kind))
Expect(ref.APIVersion).To(Equal(pool.APIVersion))
Expect(*ref.Controller).To(BeTrue())
}
}

// Validate that the rendered Deployment and Service have the expected names.
// (The template hardcodes the names to "inference-gateway-ext-proc".)
expectedName := fmt.Sprintf("%s-endpoint-picker", pool.Name)
Expect(sa.Name).To(Equal(expectedName))
Expect(role.Name).To(Equal(expectedName))
Expect(rb.Name).To(Equal(expectedName))
Expect(clusterRole.Name).To(Equal(expectedName))
Expect(crb.Name).To(Equal(expectedName))
Expect(dep.Name).To(Equal(expectedName))
Expect(svc.Name).To(Equal(expectedName))

Expand Down
Loading

0 comments on commit eda00c9

Please sign in to comment.