Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.9.0 // indirect
cloud.google.com/go/iam v1.5.3 // indirect
cloud.google.com/go/pubsub v1.50.1 // indirect
github.com/Masterminds/semver/v3 v3.4.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdB
cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10=
cloud.google.com/go/iam v1.5.3 h1:+vMINPiDF2ognBJ97ABAYYwRgsaqxPbQDlMnbHMjolc=
cloud.google.com/go/iam v1.5.3/go.mod h1:MR3v9oLkZCTlaqljW6Eb2d3HGDGK5/bDv93jhfISFvU=
cloud.google.com/go/pubsub v1.50.1 h1:fzbXpPyJnSGvWXF1jabhQeXyxdbCIkXTpjXHy7xviBM=
cloud.google.com/go/pubsub v1.50.1/go.mod h1:6YVJv3MzWJUVdvQXG081sFvS0dWQOdnV+oTo++q/xFk=
cloud.google.com/go/pubsub/v2 v2.5.0 h1:iBO4L8Iidb12B6DGP1oRxfP82FhYEAD9vLoe5Ps8+oc=
cloud.google.com/go/pubsub/v2 v2.5.0/go.mod h1:9P6nc3S1f2qzVxJQDDtWghWea8KYWz4pwqpcCuik2FI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down
34 changes: 34 additions & 0 deletions test/e2e/e2e_pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package e2e

import (
"context"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"

"github.com/llm-d-incubation/llm-d-async/pkg/async/api"
)

var _ = ginkgo.Describe("GCP Pub/Sub E2E", func() {
var ctx context.Context

ginkgo.BeforeEach(func() {
ctx = context.Background()
drainPubSub(ctx, pubsubClient, resultSubID)
resetMock(adminURL)
})

ginkgo.It("processes a message end-to-end using Pub/Sub", func() {
msg := makeRequestMessage("e2e-pubsub-1", 5*time.Minute)
publishToPubSub(ctx, pubsubClient, requestTopicID, msg)

var result *api.ResultMessage
gomega.Eventually(func() bool {
result = receiveFromPubSub(ctx, pubsubClient, resultSubID)
return result != nil
}, 60*time.Second, 1*time.Second).Should(gomega.BeTrue())

gomega.Expect(result.Id).To(gomega.Equal("e2e-pubsub-1"))
})
})
90 changes: 87 additions & 3 deletions test/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
Expand All @@ -22,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/config"
k8slog "sigs.k8s.io/controller-runtime/pkg/log"

"cloud.google.com/go/pubsub" // nolint:staticcheck
"github.com/redis/go-redis/v9"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils"
Expand All @@ -33,6 +35,8 @@ const (

// redisManifest is the manifest for the Redis deployment and service.
redisManifest = "./yaml/redis.yaml"
// pubsubManifest is the manifest for the GCP Pub/Sub emulator.
pubsubManifest = "./yaml/pubsub-emulator.yaml"
// igwMockManifest is the manifest for the mock inference gateway.
igwMockManifest = "./yaml/igw-mock.yaml"
// promMockManifest is the manifest for the mock Prometheus server.
Expand All @@ -41,10 +45,13 @@ const (
asyncProcessorManifest = "./yaml/async-processor.yaml"
// asyncProcessorSaturationManifest is the manifest for the saturation-gated async-processor.
asyncProcessorSaturationManifest = "./yaml/async-processor-saturation.yaml"
// asyncProcessorPubsubManifest is the manifest for the pubsub async-processor.
asyncProcessorPubsubManifest = "./yaml/async-processor-pubsub.yaml"
)

var (
redisPort string = env.GetEnvString("E2E_REDIS_PORT", "30379", ginkgo.GinkgoLogr)
pubsubPort string = env.GetEnvString("E2E_PUBSUB_PORT", "30850", ginkgo.GinkgoLogr)
adminPort string = env.GetEnvString("E2E_ADMIN_PORT", "30081", ginkgo.GinkgoLogr)
promMockPort string = env.GetEnvString("E2E_PROM_MOCK_PORT", "30091", ginkgo.GinkgoLogr)

Expand All @@ -58,15 +65,18 @@ var (
nsName = env.GetEnvString("NAMESPACE", "e2e-test", ginkgo.GinkgoLogr)

redisObjects []string
pubsubObjects []string
igwMockObjects []string
promMockObjects []string
asyncProcessorObjects []string
asyncProcessorSaturationObjects []string
asyncProcessorPubsubObjects []string
createdNameSpace bool

rdb *redis.Client
adminURL string
promMockURL string
rdb *redis.Client
pubsubClient *pubsub.Client
adminURL string
promMockURL string
)

func TestEndToEnd(t *testing.T) {
Expand All @@ -83,12 +93,16 @@ var _ = ginkgo.BeforeSuite(func() {
setupNameSpace()
applyManifests()
setupRedisClient()
setupPubSubClient()
})

var _ = ginkgo.AfterSuite(func() {
if rdb != nil {
rdb.Close() //nolint:errcheck
}
if pubsubClient != nil {
pubsubClient.Close() //nolint:errcheck
}

skipCleanup := env.GetEnvString("E2E_SKIP_CLEANUP", "false", ginkgo.GinkgoLogr)
if skipCleanup == "true" {
Expand Down Expand Up @@ -119,6 +133,7 @@ func setupK8sCluster() {
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}()
clusterConfig := strings.ReplaceAll(kindClusterConfig, "${REDIS_PORT}", redisPort)
clusterConfig = strings.ReplaceAll(clusterConfig, "${PUBSUB_PORT}", pubsubPort)
clusterConfig = strings.ReplaceAll(clusterConfig, "${ADMIN_PORT}", adminPort)
clusterConfig = strings.ReplaceAll(clusterConfig, "${PROM_MOCK_PORT}", promMockPort)
_, err := io.WriteString(stdin, clusterConfig)
Expand Down Expand Up @@ -159,6 +174,14 @@ func setupK8sCluster() {
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Eventually(session).WithTimeout(600 * time.Second).Should(gexec.Exit(0))
kindLoadImage("redis:7-alpine")

// Pull and load pubsub emulator image
ginkgo.By("Pulling gcr.io/google.com/cloudsdktool/cloud-sdk:emulators")
command = exec.Command(containerRuntime, "pull", "gcr.io/google.com/cloudsdktool/cloud-sdk:emulators")
session, err = gexec.Start(command, ginkgo.GinkgoWriter, ginkgo.GinkgoWriter)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Eventually(session).WithTimeout(600 * time.Second).Should(gexec.Exit(0))
kindLoadImage("gcr.io/google.com/cloudsdktool/cloud-sdk:emulators")
}

func kindLoadImage(image string) {
Expand Down Expand Up @@ -206,6 +229,9 @@ func applyManifests() {
ginkgo.By("Applying Redis manifest")
redisObjects = testutils.ApplyYAMLFile(testConfig, redisManifest)

ginkgo.By("Applying Pub/Sub manifest")
pubsubObjects = testutils.ApplyYAMLFile(testConfig, pubsubManifest)

ginkgo.By("Applying igw-mock manifest")
igwMockObjects = testutils.ApplyYAMLFile(testConfig, igwMockManifest)

Expand All @@ -225,6 +251,13 @@ func applyManifests() {
"${AP_IMAGE}": apImage,
})
asyncProcessorSaturationObjects = testutils.CreateObjsFromYaml(testConfig, apSatYamls)

ginkgo.By("Applying async-processor-pubsub manifest")
apPubsubYamls := testutils.ReadYaml(asyncProcessorPubsubManifest)
apPubsubYamls = substituteMany(apPubsubYamls, map[string]string{
"${AP_IMAGE}": apImage,
})
asyncProcessorPubsubObjects = testutils.CreateObjsFromYaml(testConfig, apPubsubYamls)
}

func setupRedisClient() {
Expand All @@ -249,6 +282,54 @@ func setupRedisClient() {
}, 30*time.Second, 1*time.Second).Should(gomega.Succeed())
}

func setupPubSubClient() {
ginkgo.By("Creating Pub/Sub client on localhost:" + pubsubPort)
ctx := context.Background()
// Set the emulator host environment variable for the client
os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:"+pubsubPort) // nolint:errcheck
Comment thread
shimib marked this conversation as resolved.

var err error
pubsubClient, err = pubsub.NewClient(ctx, "test-project")
gomega.Expect(err).NotTo(gomega.HaveOccurred())

// Create request and result topics/subscriptions
ginkgo.By("Creating request-topic and request-sub")
reqTopic, err := pubsubClient.CreateTopic(ctx, "request-topic")
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
Comment thread
shimib marked this conversation as resolved.
Outdated
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

// Re-fetch topic if it already exists
if err != nil && strings.Contains(err.Error(), "AlreadyExists") {
reqTopic = pubsubClient.Topic("request-topic")
}

_, err = pubsubClient.CreateSubscription(ctx, "request-sub", pubsub.SubscriptionConfig{
Topic: reqTopic,
})
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

ginkgo.By("Creating result-topic and result-sub")
resTopic, err := pubsubClient.CreateTopic(ctx, "result-topic")
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

// Re-fetch topic if it already exists
if err != nil && strings.Contains(err.Error(), "AlreadyExists") {
resTopic = pubsubClient.Topic("result-topic")
}

_, err = pubsubClient.CreateSubscription(ctx, "result-sub", pubsub.SubscriptionConfig{
Topic: resTopic,
})
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
}

// projectRoot returns the root of the project (two levels up from test/e2e/).
func projectRoot() string {
_, filename, _, _ := runtime.Caller(0)
Expand All @@ -275,6 +356,9 @@ nodes:
- containerPort: 30379
hostPort: ${REDIS_PORT}
protocol: TCP
- containerPort: 30850
hostPort: ${PUBSUB_PORT}
protocol: TCP
- containerPort: 30081
hostPort: ${ADMIN_PORT}
protocol: TCP
Expand Down
51 changes: 51 additions & 0 deletions test/e2e/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ import (
"github.com/onsi/gomega"
"github.com/redis/go-redis/v9"

"cloud.google.com/go/pubsub" // nolint:staticcheck
"github.com/llm-d-incubation/llm-d-async/pkg/async/api"
)

const (
requestQueue = "request-sortedset"
resultQueue = "result-list"

requestTopicID = "request-topic"
resultSubID = "result-sub"
)

var adminClient = &http.Client{Timeout: 10 * time.Second}
Expand Down Expand Up @@ -126,3 +130,50 @@ func makeRequestMessage(id string, deadlineOffset time.Duration) api.RequestMess
Payload: map[string]any{"model": id, "prompt": "test"},
}
}

func publishToPubSub(ctx context.Context, client *pubsub.Client, topicID string, msg api.RequestMessage) {
data, err := json.Marshal(msg)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

topic := client.Topic(topicID)
res := topic.Publish(ctx, &pubsub.Message{
Data: data,
})
_, err = res.Get(ctx)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
}

func receiveFromPubSub(ctx context.Context, client *pubsub.Client, subID string) *api.ResultMessage {
sub := client.Subscription(subID)
// We use a short timeout and try to pull one message.
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var result *api.ResultMessage
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
var res api.ResultMessage
if err := json.Unmarshal(msg.Data, &res); err == nil {
result = &res
msg.Ack()
cancel() // Stop receiving after one message
} else {
msg.Nack()
}
})

if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
}

return result
}

func drainPubSub(ctx context.Context, client *pubsub.Client, subID string) {
sub := client.Subscription(subID)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

_ = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
})
}
50 changes: 50 additions & 0 deletions test/e2e/yaml/async-processor-pubsub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: async-processor-pubsub
namespace: e2e-test
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: async-processor-pubsub-auth-delegator
subjects:
- kind: ServiceAccount
name: async-processor-pubsub
namespace: e2e-test
roleRef:
kind: ClusterRole
name: system:auth-delegator
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: async-processor-pubsub
namespace: e2e-test
spec:
replicas: 1
selector:
matchLabels:
app: async-processor-pubsub
template:
metadata:
labels:
app: async-processor-pubsub
spec:
serviceAccountName: async-processor-pubsub
containers:
- name: async-processor
image: ${AP_IMAGE}
imagePullPolicy: Never
env:
- name: PUBSUB_EMULATOR_HOST
value: pubsub-emulator.e2e-test.svc.cluster.local:8085
args:
- "--message-queue-impl=gcp-pubsub"
- "--pubsub.igw-base-url=http://igw-mock.e2e-test.svc.cluster.local:80"
- "--pubsub.project-id=test-project"
- "--pubsub.request-subscriber-id=request-sub"
- "--pubsub.result-topic-id=result-topic"
- "--metrics-endpoint-auth=false"
- "--concurrency=1"
Loading
Loading