Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.9.0 // indirect
cloud.google.com/go/iam v1.5.3 // indirect
cloud.google.com/go/pubsub v1.50.1 // indirect
github.com/Masterminds/semver/v3 v3.4.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdB
cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10=
cloud.google.com/go/iam v1.5.3 h1:+vMINPiDF2ognBJ97ABAYYwRgsaqxPbQDlMnbHMjolc=
cloud.google.com/go/iam v1.5.3/go.mod h1:MR3v9oLkZCTlaqljW6Eb2d3HGDGK5/bDv93jhfISFvU=
cloud.google.com/go/pubsub v1.50.1 h1:fzbXpPyJnSGvWXF1jabhQeXyxdbCIkXTpjXHy7xviBM=
cloud.google.com/go/pubsub v1.50.1/go.mod h1:6YVJv3MzWJUVdvQXG081sFvS0dWQOdnV+oTo++q/xFk=
cloud.google.com/go/pubsub/v2 v2.5.0 h1:iBO4L8Iidb12B6DGP1oRxfP82FhYEAD9vLoe5Ps8+oc=
cloud.google.com/go/pubsub/v2 v2.5.0/go.mod h1:9P6nc3S1f2qzVxJQDDtWghWea8KYWz4pwqpcCuik2FI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down
34 changes: 34 additions & 0 deletions test/e2e/e2e_pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package e2e

import (
"context"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"

"github.com/llm-d-incubation/llm-d-async/pkg/async/api"
)

var _ = ginkgo.Describe("GCP Pub/Sub E2E", func() {
var ctx context.Context

ginkgo.BeforeEach(func() {
ctx = context.Background()
drainPubSub(ctx, pubsubClient, resultSubID)
resetMock(adminURL)
})

ginkgo.It("processes a message end-to-end using Pub/Sub", func() {
msg := makeRequestMessage("e2e-pubsub-1", 5*time.Minute)
publishToPubSub(ctx, pubsubClient, requestTopicID, msg)

var result *api.ResultMessage
gomega.Eventually(func() bool {
result = receiveFromPubSub(ctx, pubsubClient, resultSubID)
return result != nil
}, 60*time.Second, 1*time.Second).Should(gomega.BeTrue())

gomega.Expect(result.Id).To(gomega.Equal("e2e-pubsub-1"))
})
})
131 changes: 128 additions & 3 deletions test/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"testing"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"
Expand All @@ -22,6 +27,7 @@
"sigs.k8s.io/controller-runtime/pkg/client/config"
k8slog "sigs.k8s.io/controller-runtime/pkg/log"

"cloud.google.com/go/pubsub" // nolint:staticcheck
"github.com/redis/go-redis/v9"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils"
Expand All @@ -33,6 +39,8 @@

// redisManifest is the manifest for the Redis deployment and service.
redisManifest = "./yaml/redis.yaml"
// pubsubManifest is the manifest for the GCP Pub/Sub emulator.
pubsubManifest = "./yaml/pubsub-emulator.yaml"
// igwMockManifest is the manifest for the mock inference gateway.
igwMockManifest = "./yaml/igw-mock.yaml"
// promMockManifest is the manifest for the mock Prometheus server.
Expand All @@ -41,10 +49,13 @@
asyncProcessorManifest = "./yaml/async-processor.yaml"
// asyncProcessorSaturationManifest is the manifest for the saturation-gated async-processor.
asyncProcessorSaturationManifest = "./yaml/async-processor-saturation.yaml"
// asyncProcessorPubsubManifest is the manifest for the pubsub async-processor.
asyncProcessorPubsubManifest = "./yaml/async-processor-pubsub.yaml"
)

var (
redisPort string = env.GetEnvString("E2E_REDIS_PORT", "30379", ginkgo.GinkgoLogr)
pubsubPort string = env.GetEnvString("E2E_PUBSUB_PORT", "30850", ginkgo.GinkgoLogr)
adminPort string = env.GetEnvString("E2E_ADMIN_PORT", "30081", ginkgo.GinkgoLogr)
promMockPort string = env.GetEnvString("E2E_PROM_MOCK_PORT", "30091", ginkgo.GinkgoLogr)

Expand All @@ -58,15 +69,20 @@
nsName = env.GetEnvString("NAMESPACE", "e2e-test", ginkgo.GinkgoLogr)

redisObjects []string
pubsubObjects []string
igwMockObjects []string
promMockObjects []string
asyncProcessorObjects []string
asyncProcessorSaturationObjects []string
asyncProcessorPubsubObjects []string
createdNameSpace bool

rdb *redis.Client
adminURL string
promMockURL string
rdb *redis.Client
pubsubClient *pubsub.Client
adminURL string
promMockURL string

usedPorts = make(map[string]bool)
)

func TestEndToEnd(t *testing.T) {
Expand All @@ -77,18 +93,27 @@
}

var _ = ginkgo.BeforeSuite(func() {
redisPort = getOrPickPort("E2E_REDIS_PORT", "30379")
pubsubPort = getOrPickPort("E2E_PUBSUB_PORT", "30850")
adminPort = getOrPickPort("E2E_ADMIN_PORT", "30081")
promMockPort = getOrPickPort("E2E_PROM_MOCK_PORT", "30091")

setupK8sCluster()
testConfig = testutils.NewTestConfig(nsName, "")
setupK8sClient()
setupNameSpace()
applyManifests()
setupRedisClient()
setupPubSubClient()
})

var _ = ginkgo.AfterSuite(func() {
if rdb != nil {
rdb.Close() //nolint:errcheck
}
if pubsubClient != nil {
pubsubClient.Close() //nolint:errcheck
}

skipCleanup := env.GetEnvString("E2E_SKIP_CLEANUP", "false", ginkgo.GinkgoLogr)
if skipCleanup == "true" {
Expand Down Expand Up @@ -119,6 +144,7 @@
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}()
clusterConfig := strings.ReplaceAll(kindClusterConfig, "${REDIS_PORT}", redisPort)
clusterConfig = strings.ReplaceAll(clusterConfig, "${PUBSUB_PORT}", pubsubPort)
clusterConfig = strings.ReplaceAll(clusterConfig, "${ADMIN_PORT}", adminPort)
clusterConfig = strings.ReplaceAll(clusterConfig, "${PROM_MOCK_PORT}", promMockPort)
_, err := io.WriteString(stdin, clusterConfig)
Expand Down Expand Up @@ -159,6 +185,14 @@
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Eventually(session).WithTimeout(600 * time.Second).Should(gexec.Exit(0))
kindLoadImage("redis:7-alpine")

// Pull and load pubsub emulator image
ginkgo.By("Pulling gcr.io/google.com/cloudsdktool/cloud-sdk:emulators")
command = exec.Command(containerRuntime, "pull", "gcr.io/google.com/cloudsdktool/cloud-sdk:emulators")
session, err = gexec.Start(command, ginkgo.GinkgoWriter, ginkgo.GinkgoWriter)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Eventually(session).WithTimeout(600 * time.Second).Should(gexec.Exit(0))
kindLoadImage("gcr.io/google.com/cloudsdktool/cloud-sdk:emulators")
}

func kindLoadImage(image string) {
Expand Down Expand Up @@ -206,6 +240,9 @@
ginkgo.By("Applying Redis manifest")
redisObjects = testutils.ApplyYAMLFile(testConfig, redisManifest)

ginkgo.By("Applying Pub/Sub manifest")
pubsubObjects = testutils.ApplyYAMLFile(testConfig, pubsubManifest)

ginkgo.By("Applying igw-mock manifest")
igwMockObjects = testutils.ApplyYAMLFile(testConfig, igwMockManifest)

Expand All @@ -225,6 +262,13 @@
"${AP_IMAGE}": apImage,
})
asyncProcessorSaturationObjects = testutils.CreateObjsFromYaml(testConfig, apSatYamls)

ginkgo.By("Applying async-processor-pubsub manifest")
apPubsubYamls := testutils.ReadYaml(asyncProcessorPubsubManifest)
apPubsubYamls = substituteMany(apPubsubYamls, map[string]string{
"${AP_IMAGE}": apImage,
})
asyncProcessorPubsubObjects = testutils.CreateObjsFromYaml(testConfig, apPubsubYamls)
}

func setupRedisClient() {
Expand All @@ -249,6 +293,84 @@
}, 30*time.Second, 1*time.Second).Should(gomega.Succeed())
}

func setupPubSubClient() {
ginkgo.By("Creating Pub/Sub client on localhost:" + pubsubPort)
ctx := context.Background()
// Set the emulator host environment variable for the client
os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:"+pubsubPort) // nolint:errcheck
Comment thread
shimib marked this conversation as resolved.

var err error
pubsubClient, err = pubsub.NewClient(ctx, "test-project")
gomega.Expect(err).NotTo(gomega.HaveOccurred())

// Create request and result topics/subscriptions
ginkgo.By("Creating request-topic and request-sub")
reqTopic, err := pubsubClient.CreateTopic(ctx, "request-topic")
if err != nil && status.Code(err) != codes.AlreadyExists {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

// Re-fetch topic if it already exists
if err != nil && status.Code(err) == codes.AlreadyExists {
reqTopic = pubsubClient.Topic("request-topic")
}

_, err = pubsubClient.CreateSubscription(ctx, "request-sub", pubsub.SubscriptionConfig{
Topic: reqTopic,
})
if err != nil && status.Code(err) != codes.AlreadyExists {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

ginkgo.By("Creating result-topic and result-sub")
resTopic, err := pubsubClient.CreateTopic(ctx, "result-topic")
if err != nil && status.Code(err) != codes.AlreadyExists {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

// Re-fetch topic if it already exists
if err != nil && status.Code(err) == codes.AlreadyExists {
resTopic = pubsubClient.Topic("result-topic")
}

_, err = pubsubClient.CreateSubscription(ctx, "result-sub", pubsub.SubscriptionConfig{
Topic: resTopic,
})
if err != nil && status.Code(err) != codes.AlreadyExists {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
}

func getOrPickPort(envVar, defaultPort string) string {
if v := os.Getenv(envVar); v != "" {
usedPorts[v] = true
return v
}
// Check if default port is free AND not already used by us
if !usedPorts[defaultPort] {
l, err := net.Listen("tcp", "localhost:"+defaultPort)
if err == nil {
l.Close()

Check failure on line 353 in test/e2e/e2e_suite_test.go

View workflow job for this annotation

GitHub Actions / lint-and-test

Error return value of `l.Close` is not checked (errcheck)
usedPorts[defaultPort] = true
return defaultPort
}
}
// Default is taken or used, pick a free one
for {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
panic(fmt.Sprintf("failed to find a free port: %v", err))
}
_, port, _ := net.SplitHostPort(l.Addr().String())
l.Close()

Check failure on line 365 in test/e2e/e2e_suite_test.go

View workflow job for this annotation

GitHub Actions / lint-and-test

Error return value of `l.Close` is not checked (errcheck)
if !usedPorts[port] {
usedPorts[port] = true
fmt.Fprintf(ginkgo.GinkgoWriter, "Default port %s is taken or already used, picked a free one: %s for %s\n", defaultPort, port, envVar)

Check failure on line 368 in test/e2e/e2e_suite_test.go

View workflow job for this annotation

GitHub Actions / lint-and-test

Error return value of `fmt.Fprintf` is not checked (errcheck)
return port
}
}
}

// projectRoot returns the root of the project (two levels up from test/e2e/).
func projectRoot() string {
_, filename, _, _ := runtime.Caller(0)
Expand All @@ -275,6 +397,9 @@
- containerPort: 30379
hostPort: ${REDIS_PORT}
protocol: TCP
- containerPort: 30850
hostPort: ${PUBSUB_PORT}
protocol: TCP
- containerPort: 30081
hostPort: ${ADMIN_PORT}
protocol: TCP
Expand Down
51 changes: 51 additions & 0 deletions test/e2e/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ import (
"github.com/onsi/gomega"
"github.com/redis/go-redis/v9"

"cloud.google.com/go/pubsub" // nolint:staticcheck
"github.com/llm-d-incubation/llm-d-async/pkg/async/api"
)

const (
requestQueue = "request-sortedset"
resultQueue = "result-list"

requestTopicID = "request-topic"
resultSubID = "result-sub"
)

var adminClient = &http.Client{Timeout: 10 * time.Second}
Expand Down Expand Up @@ -126,3 +130,50 @@ func makeRequestMessage(id string, deadlineOffset time.Duration) api.RequestMess
Payload: map[string]any{"model": id, "prompt": "test"},
}
}

func publishToPubSub(ctx context.Context, client *pubsub.Client, topicID string, msg api.RequestMessage) {
data, err := json.Marshal(msg)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

topic := client.Topic(topicID)
res := topic.Publish(ctx, &pubsub.Message{
Data: data,
})
_, err = res.Get(ctx)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
}

func receiveFromPubSub(ctx context.Context, client *pubsub.Client, subID string) *api.ResultMessage {
sub := client.Subscription(subID)
// We use a short timeout and try to pull one message.
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var result *api.ResultMessage
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
var res api.ResultMessage
if err := json.Unmarshal(msg.Data, &res); err == nil {
result = &res
msg.Ack()
cancel() // Stop receiving after one message
} else {
msg.Nack()
}
})

if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
}

return result
}

func drainPubSub(ctx context.Context, client *pubsub.Client, subID string) {
sub := client.Subscription(subID)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

_ = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
msg.Ack()
})
}
Loading
Loading