Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/apis/serving/v1alpha1/llm_inference_service_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ const (
)

const (
GatewaysReady apis.ConditionType = "GatewaysReady"
HTTPRoutesReady apis.ConditionType = "HTTPRoutesReady"
InferencePoolsReady apis.ConditionType = "InferencePoolsReady"
GatewaysReady apis.ConditionType = "GatewaysReady"
HTTPRoutesReady apis.ConditionType = "HTTPRoutesReady"
InferencePoolReady apis.ConditionType = "InferencePoolReady"
)

var llmInferenceServiceCondSet = apis.NewLivingConditionSet(
Expand Down Expand Up @@ -141,19 +141,19 @@ func (in *LLMInferenceService) MarkHTTPRoutesNotReady(reason, messageFormat stri
in.GetConditionSet().Manage(in.GetStatus()).MarkFalse(HTTPRoutesReady, reason, messageFormat, messageA...)
}

func (in *LLMInferenceService) MarkInferencePoolsReady() {
in.GetConditionSet().Manage(in.GetStatus()).MarkTrue(InferencePoolsReady)
func (in *LLMInferenceService) MarkInferencePoolReady() {
in.GetConditionSet().Manage(in.GetStatus()).MarkTrue(InferencePoolReady)
}

func (in *LLMInferenceService) MarkInferencePoolsNotReady(reason, messageFormat string, messageA ...interface{}) {
in.GetConditionSet().Manage(in.GetStatus()).MarkFalse(InferencePoolsReady, reason, messageFormat, messageA...)
func (in *LLMInferenceService) MarkInferencePoolNotReady(reason, messageFormat string, messageA ...interface{}) {
in.GetConditionSet().Manage(in.GetStatus()).MarkFalse(InferencePoolReady, reason, messageFormat, messageA...)
}

func (in *LLMInferenceService) DetermineRouterReadiness() {
subConditions := []*apis.Condition{
in.GetStatus().GetCondition(GatewaysReady),
in.GetStatus().GetCondition(HTTPRoutesReady),
in.GetStatus().GetCondition(InferencePoolsReady),
in.GetStatus().GetCondition(InferencePoolReady),
in.GetStatus().GetCondition(SchedulerWorkloadReady),
}

Expand Down
165 changes: 31 additions & 134 deletions pkg/controller/llmisvc/controller_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package llmisvc_test

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -231,10 +232,13 @@ var _ = Describe("LLMInferenceService Controller", func() {
return envTest.Client.Get(ctx, client.ObjectKey{Name: svcName + "-inference-pool", Namespace: llmSvc.GetNamespace()}, &ip)
}).WithContext(ctx).Should(Succeed())

Eventually(LLMInferenceServiceIsReady(llmSvc)).WithContext(ctx).Should(Succeed())
Eventually(LLMInferenceServiceIsReady(llmSvc, func(g Gomega, current *v1alpha1.LLMInferenceService) {
g.Expect(current.Status).To(HaveCondition(string(v1alpha1.HTTPRoutesReady), "True"))
g.Expect(current.Status).To(HaveCondition(string(v1alpha1.InferencePoolReady), "True"))
})).WithContext(ctx).Should(Succeed())
})

It("should reference external InferencePool", func(ctx SpecContext) {
It("should use referenced external InferencePool", func(ctx SpecContext) {
// given
svcName := "test-llm-create-http-route-inf-pool-ref"
nsName := kmeta.ChildName(svcName, "-test")
Expand Down Expand Up @@ -310,105 +314,12 @@ var _ = Describe("LLMInferenceService Controller", func() {
Expect(expectedHTTPRoute).To(HaveBackendRefs(BackendRefInferencePool(infPoolName)))
Expect(expectedHTTPRoute).To(Not(HaveBackendRefs(BackendRefService(svcName + "-kserve-workload-svc"))))

ensureRouterManagedResourcesAreReady(ctx, envTest.Client, llmSvc)

Eventually(LLMInferenceServiceIsReady(llmSvc)).WithContext(ctx).Should(Succeed())
})

It("should evaluate InferencePool readiness conditions", func(ctx SpecContext) {
// given
svcName := "test-llm-infpool-conditions"
nsName := kmeta.ChildName(svcName, "-test")
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: nsName,
},
}
Expect(envTest.Client.Create(ctx, namespace)).To(Succeed())
Expect(envTest.Client.Create(ctx, IstioShadowService(svcName, nsName))).To(Succeed())
defer func() {
envTest.DeleteAll(namespace)
}()

modelURL, err := apis.ParseURL("hf://facebook/opt-125m")
Expect(err).ToNot(HaveOccurred())

// Create the Gateway
ingressGateway := DefaultGateway(nsName)
Expect(envTest.Client.Create(ctx, ingressGateway)).To(Succeed())
ensureGatewayReady(ctx, envTest.Client, ingressGateway)
defer func() {
Expect(envTest.Delete(ctx, ingressGateway)).To(Succeed())
}()

// Create the inference pool
infPoolName := kmeta.ChildName(svcName, "-my-inf-pool")
infPool := InferencePool(infPoolName,
InNamespace[*igwapi.InferencePool](nsName),
WithSelector("app", "workload"),
WithTargetPort(8000),
WithExtensionRef("", "Service", kmeta.ChildName(svcName, "-epp-service")),
)
Expect(envTest.Create(ctx, infPool)).To(Succeed())
defer func() {
Expect(envTest.Delete(ctx, infPool)).To(Succeed())
}()

// Create the llmd inference service
llmSvc := &v1alpha1.LLMInferenceService{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,
Namespace: nsName,
},
Spec: v1alpha1.LLMInferenceServiceSpec{
Model: v1alpha1.LLMModelSpec{
URI: *modelURL,
},
WorkloadSpec: v1alpha1.WorkloadSpec{},
Router: &v1alpha1.RouterSpec{
Scheduler: &v1alpha1.SchedulerSpec{
Pool: &v1alpha1.InferencePoolSpec{
Ref: &corev1.LocalObjectReference{
Name: infPoolName,
},
},
},
},
},
}

// when
Expect(envTest.Create(ctx, llmSvc)).To(Succeed())
defer func() {
Expect(envTest.Delete(ctx, llmSvc)).To(Succeed())
}()

// then - verify InferencePoolsReady condition is False because the pool is not ready
Eventually(func(g Gomega, ctx context.Context) {
current := &v1alpha1.LLMInferenceService{}
g.Expect(envTest.Get(ctx, client.ObjectKeyFromObject(llmSvc), current)).To(Succeed())
g.Expect(current.Status).To(HaveCondition(string(v1alpha1.InferencePoolsReady), "False"))
g.Expect(current.Status).To(HaveCondition(string(v1alpha1.RouterReady), "False"))
}).WithContext(ctx).Should(Succeed())

// when - we make the inference pool ready
ensureInferencePoolReady(ctx, envTest.Client, infPool)

// then - verify InferencePoolsReady condition becomes True
Eventually(func(g Gomega, ctx context.Context) error {
current := &v1alpha1.LLMInferenceService{}
g.Expect(envTest.Get(ctx, client.ObjectKeyFromObject(llmSvc), current)).To(Succeed())

// Check that InferencePoolsReady condition exists and is True
poolsCondition := current.Status.GetCondition(v1alpha1.InferencePoolsReady)
g.Expect(poolsCondition).ToNot(BeNil(), "InferencePoolsReady condition should be set")
g.Expect(poolsCondition.IsTrue()).To(BeTrue(), "InferencePoolsReady condition should be True")

return nil
}).WithContext(ctx).Should(Succeed(), "InferencePoolsReady condition should be set to True")
ensureRouterManagedResourcesAreReady(ctx, envTest.Client, llmSvc)

Eventually(LLMInferenceServiceIsReady(llmSvc, func(g Gomega, current *v1alpha1.LLMInferenceService) {
g.Expect(current.Status).To(HaveCondition(string(v1alpha1.InferencePoolsReady), "True"))
g.Expect(current.Status).To(HaveCondition(string(v1alpha1.HTTPRoutesReady), "True"))
g.Expect(current.Status).To(HaveCondition(string(v1alpha1.InferencePoolReady), "True"))
})).WithContext(ctx).Should(Succeed())
})

Expand Down Expand Up @@ -974,49 +885,18 @@ func ensureInferencePoolReady(ctx context.Context, c client.Client, pool *igwapi
return
}

// Get the current InferencePool
createdPool := &igwapi.InferencePool{}
Expect(c.Get(ctx, client.ObjectKeyFromObject(pool), createdPool)).To(Succeed())

// Set the status conditions to simulate the controller making the InferencePool ready
createdPool.Status.Parents = []igwapi.PoolStatus{
{
GatewayRef: corev1.ObjectReference{
Name: "kserve-ingress-gateway", // Example gateway
},
Conditions: []metav1.Condition{
{
Type: "Accepted",
Status: metav1.ConditionTrue,
Reason: "Accepted",
Message: "InferencePool accepted",
LastTransitionTime: metav1.Now(),
},
},
},
}

// Update the status
WithInferencePoolReadyStatus()(createdPool)
Expect(c.Status().Update(ctx, createdPool)).To(Succeed())

// Verify the InferencePool is now ready
updatedPool := &igwapi.InferencePool{}
Eventually(func(g Gomega, ctx context.Context) bool {
updatedPool := &igwapi.InferencePool{}
updatedPool = &igwapi.InferencePool{}
g.Expect(c.Get(ctx, client.ObjectKeyFromObject(pool), updatedPool)).To(Succeed())
isReady := false
for _, parent := range updatedPool.Status.Parents {
for _, cond := range parent.Conditions {
if cond.Type == "Accepted" && cond.Status == metav1.ConditionTrue {
isReady = true
break
}
}
if isReady {
break
}
}
return isReady
}).WithContext(ctx).Should(BeTrue())
return llmisvc.IsInferencePoolReady(updatedPool)
}).WithContext(ctx).Should(BeTrue(), fmt.Sprintf("Expected InferencePool to be ready, got: %#v", updatedPool.Status))
}

// Only runs in non-cluster mode
Expand Down Expand Up @@ -1063,6 +943,23 @@ func ensureRouterManagedResourcesAreReady(ctx context.Context, c client.Client,
// Ensure at least one HTTPRoute was found and made ready
g.Expect(httpRoutes.Items).To(gomega.HaveLen(1), "Expected exactly one managed HTTPRoute")

infPoolsListOpts := &client.ListOptions{
Namespace: llmSvc.Namespace,
LabelSelector: labels.SelectorFromSet(llmisvc.SchedulerLabels(llmSvc)),
}

infPools := &igwapi.InferencePoolList{}
err = c.List(ctx, infPools, infPoolsListOpts)
if err != nil && !errors.IsNotFound(err) {
g.Expect(err).NotTo(gomega.HaveOccurred())
}
logf.FromContext(ctx).Info("Marking InferencePool resources ready", "inferencepools", infPools)
for _, pool := range infPools.Items {
updatedPool := pool.DeepCopy()
WithInferencePoolReadyStatus()(updatedPool)
g.Expect(c.Status().Update(ctx, updatedPool)).To(gomega.Succeed())
}

ensureSchedulerDeploymentReady(ctx, c, llmSvc)
}).WithContext(ctx).Should(gomega.Succeed())
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/llmisvc/fixture/gwapi_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,10 @@ func WithInferencePoolReadyStatus() InferencePoolOption {
{
Conditions: []metav1.Condition{
{
Type: string(igwapi.InferencePoolConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(igwapi.InferencePoolReasonAccepted),
Type: string(igwapi.InferencePoolConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(igwapi.InferencePoolReasonAccepted),
LastTransitionTime: metav1.Now(),
},
},
},
Expand Down
88 changes: 36 additions & 52 deletions pkg/controller/llmisvc/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (r *LLMInferenceServiceReconciler) reconcileRouter(ctx context.Context, llm
}

// Evaluate the subconditions
if err := r.EvaluateInferencePoolConditions(ctx, llmSvc); err != nil {
return fmt.Errorf("failed to evaluate Inference Pool conditions: %w", err)
}

if err := r.EvaluateGatewayConditions(ctx, llmSvc); err != nil {
return fmt.Errorf("failed to evaluate gateway conditions: %w", err)
}
Expand All @@ -75,10 +79,6 @@ func (r *LLMInferenceServiceReconciler) reconcileRouter(ctx context.Context, llm
return fmt.Errorf("failed to evaluate HTTPRoute conditions: %w", err)
}

if err := r.EvaluateInferencePoolConditions(ctx, llmSvc); err != nil {
return fmt.Errorf("failed to evaluate Inference Pool conditions: %w", err)
}

return nil
}

Expand Down Expand Up @@ -378,72 +378,56 @@ func (r *LLMInferenceServiceReconciler) EvaluateHTTPRouteConditions(ctx context.
}

// EvaluateInferencePoolConditions evaluates the readiness of all Inference Pools in the LLMInferenceService
// and updates the InferencePoolsReady condition accordingly
// and updates the InferencePoolReady condition accordingly
func (r *LLMInferenceServiceReconciler) EvaluateInferencePoolConditions(ctx context.Context, llmSvc *v1alpha1.LLMInferenceService) error {
logger := log.FromContext(ctx).WithName("EvaluateInferencePoolConditions")

// If no router or scheduler configuration, mark Inference Pools as ready (no Inference Pools to evaluate)
if llmSvc.Spec.Router == nil || llmSvc.Spec.Router.Scheduler == nil || llmSvc.Spec.Router.Scheduler.Pool == nil {
logger.V(2).Info("No Router or Inference Pool configuration found, marking InferencePoolsReady as True")
llmSvc.MarkInferencePoolsReady()
if llmSvc.Spec.Router == nil || llmSvc.Spec.Router.Scheduler == nil {
logger.V(2).Info("Scheduler is disabled, marking InferencePoolReady as True")
llmSvc.MarkInferencePoolReady()
return nil
}

// Collect all Inference Pools (both referenced and embedded)
var allPools []*igwapi.InferencePool
curr := &igwapi.InferencePool{}

// Get the referenced inference pool
referencedPool := &igwapi.InferencePool{}
if llmSvc.Spec.Router.Scheduler.Pool.Ref != nil {
if llmSvc.Spec.Router.Scheduler.Pool != nil && llmSvc.Spec.Router.Scheduler.Pool.Ref != nil && llmSvc.Spec.Router.Scheduler.Pool.Ref.Name != "" {
poolRef := llmSvc.Spec.Router.Scheduler.Pool.Ref

err := r.Client.Get(ctx, types.NamespacedName{
Namespace: llmSvc.Namespace,
Name: poolRef.Name,
}, referencedPool)
err := r.Client.Get(ctx, types.NamespacedName{Namespace: llmSvc.Namespace, Name: poolRef.Name}, curr)
if err != nil {
msg := fmt.Sprintf("Failed to fetch referenced Inference Pool: %v", err)
llmSvc.MarkInferencePoolsNotReady("InferencePoolFetchError", msg)
return fmt.Errorf(msg, err)
err := fmt.Errorf("failed to fetch referenced Inference Pool %s/%s: %w", llmSvc.Namespace, poolRef.Name, err)
llmSvc.MarkInferencePoolNotReady("InferencePoolFetchError", err.Error())
return err
}
}
allPools = append(allPools, referencedPool)

// Check if the LLMInferenceService has an embedded Inference Pool spec
if llmSvc.Spec.Router.Scheduler.Pool.Spec != nil {
embeddedPool := &igwapi.InferencePool{}
err := r.Client.Get(ctx, types.NamespacedName{
Name: llmSvc.Name + "-pool",
Namespace: llmSvc.Namespace,
}, embeddedPool)
} else {
expected := r.expectedSchedulerInferencePool(ctx, llmSvc)
err := r.Client.Get(ctx, types.NamespacedName{Namespace: expected.Namespace, Name: expected.Name}, curr)
if err != nil {
msg := fmt.Sprintf("Failed to fetch embedded Inference Pool: %v", err)
llmSvc.MarkInferencePoolsNotReady("InferencePoolFetchError", msg)
return fmt.Errorf(msg, err)
err := fmt.Errorf("failed to fetch embedded Inference Pool %s/%s: %w", llmSvc.Namespace, llmSvc.Name, err)
llmSvc.MarkInferencePoolNotReady("InferencePoolFetchError", err.Error())
return err
}
allPools = append(allPools, embeddedPool)
}

// If the list is empty at this point, mark as ready since there is nothing to evaluate
if len(allPools) == 0 {
logger.V(2).Info("No Inference Pools found, marking InferencePoolsReady as True")
llmSvc.MarkInferencePoolsReady()
return nil
}

// Determine which inference pools are not ready
notReadyPools := EvaluateInferencePoolReadiness(ctx, allPools)
if len(notReadyPools) > 0 {
poolNames := make([]string, len(notReadyPools))
for i, route := range notReadyPools {
poolNames[i] = fmt.Sprintf("%s/%s", route.Namespace, route.Name)
if !IsInferencePoolReady(curr) {
topLevelCondition, _ := nonReadyInferencePoolTopLevelCondition(curr)
if topLevelCondition != nil {
llmSvc.MarkInferencePoolNotReady("InferencePoolNotReady", fmt.Sprintf(
"%s/%s: %v=%#v (reason %q, message %q)",
curr.Namespace,
curr.Name,
topLevelCondition.Type,
topLevelCondition.Status,
topLevelCondition.Reason,
topLevelCondition.Message,
))
} else {
llmSvc.MarkInferencePoolNotReady("InferencePoolNotReady", fmt.Sprintf("The inference pool %s/%s is not ready", curr.Namespace, curr.Name))
}
llmSvc.MarkInferencePoolsNotReady("InferencePoolsNotReady", "The following Inference Pools are not ready: %v", poolNames)
logger.V(2).Info("Some Inference Pools are not ready", "pools", notReadyPools)
return nil
}

llmSvc.MarkInferencePoolsReady()
logger.V(2).Info("All Inference Pools are ready", "pools", allPools)
llmSvc.MarkInferencePoolReady()
logger.V(2).Info("Inference Pool is ready", "pool", curr)
return nil
}
Loading
Loading