Skip to content

feat: add Pub/Sub readiness health check#262

Open
shimib wants to merge 1 commit into
llm-d-incubation:mainfrom
shimib:feat/pubsub-readiness-probe
Open

feat: add Pub/Sub readiness health check#262
shimib wants to merge 1 commit into
llm-d-incubation:mainfrom
shimib:feat/pubsub-readiness-probe

Conversation

@shimib

@shimib shimib commented Jun 18, 2026

Copy link
Copy Markdown
Collaborator

Summary

Fixes #246.

/readyz only runs a backend health check when the active Flow implements the optional pipeline.HealthChecker interface (cmd/main.go). Only the Redis flows did (rdb.Ping), so for gcp-pubsub / gcp-pubsub-gated the type assertion failed, checker stayed nil, and /readyz reflected only the in-process ready flag — a pod with an unreachable Pub/Sub backend still reported ready.

This PR makes PubSubMQFlow implement pipeline.HealthChecker so /readyz verifies real broker/subscription connectivity. The cmd/main.go wiring picks it up automatically.

How it works

HealthCheck probes each configured request subscription via SubscriptionAdminClient.GetSubscription, reusing Subscriber().String() for resource-name normalization (handles both short IDs and full resource paths).

IAM consideration

GetSubscription requires pubsub.subscriptions.get, which is not included in roles/pubsub.subscriber (the consume-only role). To avoid falsely marking correctly-configured consumer pods as not-ready, a PermissionDenied response is treated as healthy — it still proves the broker round-tripped (reachable). Genuine connectivity/config failures (Unavailable, DeadlineExceeded, NotFound, connection errors) mark the pod not-ready, which still fixes the original bug.

Tests

Introduces the pstest in-memory fake (first use in the repo) covering:

Notes for reviewers

  • Scope is subscription connectivity only, not the result topic used for publishing. Happy to add a GetTopic check on resultTopicID if preferred.
  • Start() still uses the package-global pubSubClient; only the new HealthCheck uses the struct client field (kept minimal). Can migrate Start for consistency if desired.
  • go.mod: google.golang.org/grpc promoted to a direct dependency (now imported in tests); pstest transitive deps added via go mod tidy.

Verification

  • go build ./...
  • make test
  • golangci-lint run ./pkg/pubsub/... (0 issues)

/readyz only runs a backend check when the active Flow implements
pipeline.HealthChecker. Only the Redis flows did, so for gcp-pubsub /
gcp-pubsub-gated the type assertion in cmd/main.go failed, the checker
stayed nil, and /readyz reflected only the in-process ready flag — a pod
with an unreachable Pub/Sub backend still reported ready (llm-d-incubation#246).

Implement HealthCheck on PubSubMQFlow. It probes each configured request
subscription via SubscriptionAdminClient.GetSubscription, reusing
Subscriber().String() for resource-name normalization. A PermissionDenied
response is treated as healthy because it still proves the broker is
reachable: the consume-only role (roles/pubsub.subscriber) lacks
pubsub.subscriptions.get, so a correctly-configured consumer can confirm
connectivity without introspecting the subscription. Genuine failures
(Unavailable, NotFound, etc.) mark the pod not-ready.

The cmd/main.go wiring picks up the HealthChecker automatically. A client
field is stored on the flow so the new path is injectable in tests.

Add pstest-based unit tests covering healthy, missing-subscription, and
broker-unreachable (the llm-d-incubation#246 regression guard) cases.

Fixes llm-d-incubation#246

Signed-off-by: Shimi Bandiel <shimib@google.com>
Comment thread pkg/pubsub/pubsubimpl.go
// its result as the fully-qualified name for the admin lookup.
name := r.client.Subscriber(cd.subscriberID).String()
_, err := r.client.SubscriptionAdminClient.GetSubscription(ctx,
&pubsubpb.GetSubscriptionRequest{Subscription: name})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use .Exists()?
I don't think Exists() requires additional IAM permissions.

You can use sub.Exists() in the requestWorker() loop to cache the current state of the subscription to a shared data structure and then read that data structure in HealthCheck().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature]: readiness probe for pub/sub

2 participants