Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,60 @@ var _ = ginkgo.Describe("Run end to end tests", ginkgo.Ordered, func() {
testutils.DeleteObjects(testConfig, modelServers)
})
})

ginkgo.When("Scaling up and down the model servers", func() {
ginkgo.It("should distribute inference requests across all model servers", func() {
modelServers := createModelServers(false, false, 1, 0, 0)

epp := createEndPointPicker(scaleConfig)

prefillPods, decodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector)
gomega.Expect(prefillPods).Should(gomega.BeEmpty())
gomega.Expect(decodePods).Should(gomega.HaveLen(1))

var nsHdr, podHdr string
for range 5 {
nsHdr, podHdr = runCompletion(simplePrompt, modelName)
gomega.Expect(nsHdr).Should(gomega.Equal(nsName))
gomega.Expect(podHdr).Should(gomega.Equal(decodePods[0]))
}

scaleDeployment(modelServers, 1)

scaledUpPrefillPods, scaledUpDecodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector)
gomega.Expect(scaledUpPrefillPods).Should(gomega.BeEmpty())
gomega.Expect(scaledUpDecodePods).Should(gomega.HaveLen(2))

var scaledNsHdr, scaledPodHdr string
// Run inference multiple times until one is scheduled on the new pod
for range 30 {
scaledNsHdr, scaledPodHdr = runCompletion(extraPrompt, modelName)
gomega.Expect(scaledNsHdr).Should(gomega.Equal(nsName))
gomega.Expect(scaledPodHdr).Should(gomega.BeElementOf(scaledUpDecodePods))
if scaledPodHdr != podHdr {
break
}
}
gomega.Expect(scaledPodHdr).ShouldNot(gomega.Equal(podHdr))

scaleDeployment(modelServers, -1)

scaledDownPrefillPods, scaledDownDecodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector)
gomega.Expect(scaledDownPrefillPods).Should(gomega.BeEmpty())
gomega.Expect(scaledDownDecodePods).Should(gomega.HaveLen(1))
gomega.Expect(scaledDownDecodePods[0]).Should(gomega.BeElementOf(scaledUpDecodePods))

// Run multiple times and insure that they are scheduled on the remaining pod
for range 5 {
nsHdr, podHdr = runCompletion(simplePrompt, modelName)
gomega.Expect(nsHdr).Should(gomega.Equal(nsName))
gomega.Expect(podHdr).Should(gomega.Equal(scaledDownDecodePods[0]))
}

testutils.DeleteObjects(testConfig, epp)
testutils.DeleteObjects(testConfig, modelServers)
})
})
})

// createModelServers creates the model server resources used for testing from the given filePaths.
Expand Down Expand Up @@ -341,3 +395,15 @@ schedulingProfiles:
- pluginRef: precise-prefix-cache-scorer
weight: 10
`

// EPP configuration for running scale model server test
const scaleConfig = `apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
- type: max-score-picker
- type: single-profile-handler
schedulingProfiles:
- name: default
plugins:
- pluginRef: max-score-picker
`
38 changes: 36 additions & 2 deletions test/e2e/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,44 @@ import (
"github.com/onsi/gomega/gexec"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apilabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

const (
deploymentKind = "deployment"
)

func scaleDeployment(objects []string, increment int) {
k8sCfg := config.GetConfigOrDie()
client, err := kubernetes.NewForConfig(k8sCfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
direction := "up"
absIncrement := increment
if increment < 0 {
direction = "down"
absIncrement = -increment
}

for _, kindAndName := range objects {
split := strings.Split(kindAndName, "/")
if strings.ToLower(split[0]) == deploymentKind {
ginkgo.By(fmt.Sprintf("Scaling the deployment %s %s by %d", split[1], direction, absIncrement))
scale, err := client.AppsV1().Deployments(nsName).GetScale(testConfig.Context, split[1], v1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

scale.Spec.Replicas += int32(increment)
_, err = client.AppsV1().Deployments(nsName).UpdateScale(testConfig.Context, split[1], scale, v1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
}
podsInDeploymentsReady(objects)
}

// getModelServerPods Returns the list of Prefill and Decode vLLM pods separately
func getModelServerPods(podLabels, prefillLabels, decodeLabels map[string]string) ([]string, []string) {
pods := getPods(podLabels)
Expand Down Expand Up @@ -73,11 +106,12 @@ func podsInDeploymentsReady(objects []string) {
var deployment appsv1.Deployment
helper := func(deploymentName string) bool {
err := testConfig.K8sClient.Get(testConfig.Context, types.NamespacedName{Namespace: nsName, Name: deploymentName}, &deployment)
return err == nil && deployment.Status.Replicas == deployment.Status.ReadyReplicas
return err == nil && *deployment.Spec.Replicas == deployment.Status.Replicas &&
deployment.Status.Replicas == deployment.Status.ReadyReplicas
}
for _, kindAndName := range objects {
split := strings.Split(kindAndName, "/")
if strings.ToLower(split[0]) == "deployment" {
if strings.ToLower(split[0]) == deploymentKind {
ginkgo.By(fmt.Sprintf("Waiting for pods of %s to be ready", split[1]))
gomega.Eventually(helper, readyTimeout, interval).WithArguments(split[1]).Should(gomega.BeTrue())
}
Expand Down