Skip to content

Commit d69dcca

Browse files
joeltgclaude
andcommitted
fix: tierpriority catch-all, helm dead flags, SS gating cleanup, pubsub buffering
P1 — tierpriority strands unknown labels popNext only polled buckets in the configured ClassOrder × TierOrder (plus empty-string fallbacks). A message with tier=urgent or class=gold (typo or schema drift) landed in a bucket nothing consults — work stranded until the input channel closed. Added popUnconfigured: a final catch-all pass that drains any bucket whose (tier, class) key isn't covered by the configured order, at lowest priority. The policy is now total over its input domain. P1 — helm chart emits removed flags ap-deployments.yaml still emitted --redis.ss.gate-type and --redis.ss.gate-params (flags removed when the SS gate impl was dropped). Any operator setting ap.redis.gateType would crashloop on "flag provided but not defined". Removed the block, the matching values.yaml defaults, the dead gateParamsJson helper, and the e2e deploy values references. P1 — SS gating landing-state inconsistency WithGateFactory on RedisSortedSetFlow was stored-but-unread, main.go logged "with per-queue gating", and README told users to switch to SS for per-queue gating — all false (SS gates aren't wired). Removed WithGateFactory + SortedSetOption + gateFactory field, dropped the arg from main.go, fixed the log line, and corrected README to direct operators at gcp-pubsub-gated for actual per-queue gating. P2 — pubsub callback could block prefetch under saturation The per-subscription channel was unbuffered, so under sustained saturation a stalled merge goroutine propagated straight back to the receive callback, halting Pub/Sub prefetch. Worse, ctx cancellation mid-stall deadlocked the callback (Receive() never exits). Buffered the channel to the same prefetch depth as Pub/Sub's MaxOutstandingMessages (extracted prefetchBufferDepth helper) so the callback is non-blocking up to the prefetch window. Wrapped the send in select-with-ctx so shutdown unblocks and any subscription-gate releases fire cleanly on cancel. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: joeltg <joel@reflection.ai>
1 parent 54c26fd commit d69dcca

9 files changed

Lines changed: 106 additions & 53 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ The configuration file when using the `redis.queues-config-file` flag should hav
280280
]
281281
```
282282
283-
<u>Note:</u> The ephemeral Redis Channels implementation does not support per-queue dispatch gates. Use the [Redis Sorted Set](#redis-sorted-set-persisted) implementation for per-queue gating.
283+
<u>Note:</u> Per-queue / per-pool dispatch gates are currently only wired on the `gcp-pubsub-gated` flow. The Redis Channels and Redis Sorted Set flows do not yet apply gate chains.
284284
285285
**Configuration Fields:**
286286

charts/async-processor/templates/_helpers.tpl

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,6 @@ Create the name of the service account to use
5757
{{- default (include "async-processor.fullname" .) .Values.serviceAccount.name }}
5858
{{- end }}
5959

60-
{{/*
61-
Render gate params as JSON with all values as strings.
62-
The gate params parser expects map[string]string, so numeric values must be quoted.
63-
*/}}
64-
{{- define "async-processor.gateParamsJson" -}}
65-
{{- $out := dict -}}
66-
{{- range $k, $v := .Values.ap.redis.gateParams -}}
67-
{{- $_ := set $out $k ($v | toString) -}}
68-
{{- end -}}
69-
{{- $out | toJson -}}
70-
{{- end }}
71-
7260
{{/*
7361
Resolve the Redis secret name.
7462
If redis.url is set, the chart creates a Secret named <fullname>-redis.

charts/async-processor/templates/ap-deployments.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ spec:
3737
- "{{ .Values.ap.redis.pollIntervalMs | default 1000 }}"
3838
- --redis.ss.batch-size
3939
- "{{ .Values.ap.redis.batchSize | default 10 }}"
40-
{{- if .Values.ap.redis.gateType }}
41-
- --redis.ss.gate-type={{ .Values.ap.redis.gateType }}
42-
- --redis.ss.gate-params={{ include "async-processor.gateParamsJson" . }}
43-
{{- end }}
4440
{{- else }}
4541
- --message-queue-impl=redis-pubsub
4642
- --redis.igw-base-url={{ .Values.ap.igwBaseURL }}

charts/async-processor/values.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,12 @@ ap:
3838
resultQueueName: "result-list"
3939
pollIntervalMs: 1000
4040
batchSize: 10
41-
gateType: ""
42-
gateParams: {}
41+
# Note: per-pool / per-subscription gate chains are configured via
42+
# the topics-config-file (see README), not chart values. Chart-level
43+
# support for the new gate types is not yet wired.
4344
# PodMonitor for model server metrics relabeling.
4445
# Creates a PodMonitor that scrapes vLLM metrics and relabels
4546
# the inference_pool pod label into scraped metrics.
46-
# Required when using prometheus-budget or prometheus-saturation gates
47-
# without llm-d's flow control plugin enabled.
4847
modelServerMonitor:
4948
enabled: false
5049
selector:

cmd/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,13 @@ func main() {
122122
}
123123
impl = flow
124124
case "redis-sortedset":
125-
flow, err := redis.NewRedisSortedSetFlow(redis.WithGateFactory(gateFactory))
125+
flow, err := redis.NewRedisSortedSetFlow()
126126
if err != nil {
127127
setupLog.Error(err, "Failed to create Redis sorted-set flow")
128128
os.Exit(1)
129129
}
130130
impl = flow
131-
setupLog.Info("Using Redis sorted-set flow with per-queue gating")
131+
setupLog.Info("Using Redis sorted-set flow")
132132
case "gcp-pubsub":
133133
impl = pubsub.NewGCPPubSubMQFlow()
134134
case "gcp-pubsub-gated":

docs/guides/e2e-deploy/async-processor-values.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ ap:
1919
resultQueueName: "result-list"
2020
pollIntervalMs: 1000
2121
batchSize: 10
22-
gateType: "prometheus-budget"
23-
gateParams:
24-
pool: "optimized-baseline"
25-
max_concurrency: "100"
26-
baseline: "0.05"
2722
modelServerMonitor:
2823
enabled: true
2924
selector:

pkg/async/inference/mergepolicy/tierpriority/policy.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
// pipeline knows nothing about them. Operators configure the label key
1717
// and value-ordering, so different vocabularies (or more than two
1818
// classes) drop in without code changes.
19+
//
20+
// Messages whose tier or class label values aren't in the configured
21+
// order (operator typos, schema drift) aren't lost: they drain at
22+
// lowest priority via a catch-all pass after the configured priority
23+
// passes complete. The policy is total over its input domain.
1924
package tierpriority
2025

2126
import (
@@ -106,6 +111,32 @@ type Policy struct {
106111
cfg Config
107112
}
108113

114+
// isConfiguredKey reports whether (tier, class) is one of the buckets
115+
// popNext drains in its configured passes. Used by popUnconfigured to
116+
// identify catch-all candidates. Empty-string values count as
117+
// configured (the configured passes consult them explicitly).
118+
func (p *Policy) isConfiguredKey(k bucketKey) bool {
119+
tierOK := k.tier == ""
120+
if !tierOK {
121+
for _, t := range p.cfg.TierOrder {
122+
if t == k.tier {
123+
tierOK = true
124+
break
125+
}
126+
}
127+
}
128+
classOK := k.class == ""
129+
if !classOK {
130+
for _, c := range p.cfg.ClassOrder {
131+
if c == k.class {
132+
classOK = true
133+
break
134+
}
135+
}
136+
}
137+
return tierOK && classOK
138+
}
139+
109140
var _ pipeline.RequestMergePolicy = (*Policy)(nil)
110141

111142
// MergeRequestChannels groups incoming RequestChannels by pool, spawns
@@ -248,13 +279,40 @@ func (ps *poolState) popNext() *pipeline.EmbelishedRequestMessage {
248279
if m := ps.popBucket(bucketKey{"", ""}); m != nil {
249280
return m
250281
}
282+
// Catch-all: any bucket whose (tier, class) isn't covered by
283+
// the configured ClassOrder / TierOrder still contains real
284+
// work. Drain it at lowest priority — keeps the policy total
285+
// over its input domain so typos or schema drift (e.g.
286+
// `tier=urgent` against a config that only declares
287+
// interactive/async/batch) don't strand messages.
288+
if m := ps.popUnconfigured(); m != nil {
289+
return m
290+
}
251291
if ps.closed {
252292
return nil
253293
}
254294
ps.cond.Wait()
255295
}
256296
}
257297

298+
// popUnconfigured pops one message from any bucket whose (tier, class)
299+
// key is not in the configured ClassOrder / TierOrder. The configured
300+
// passes above already drained their buckets; whatever remains is, by
301+
// definition, unconfigured. Iteration order is map-iteration (random)
302+
// — these are operator-misconfigured messages, so we don't promise any
303+
// particular interleaving among them.
304+
func (ps *poolState) popUnconfigured() *pipeline.EmbelishedRequestMessage {
305+
for key := range ps.buckets {
306+
if ps.policy.isConfiguredKey(key) {
307+
continue
308+
}
309+
if m := ps.popBucket(key); m != nil {
310+
return m
311+
}
312+
}
313+
return nil
314+
}
315+
258316
func (ps *poolState) popBucket(key bucketKey) *pipeline.EmbelishedRequestMessage {
259317
b := ps.buckets[key]
260318
if b == nil {

pkg/pubsub/pubsubimpl.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,16 @@ func NewGCPPubSubMQFlow(opts ...PubSubOption) *PubSubMQFlow {
398398
panic(fmt.Sprintf("failed to build subscription gates for %q: %v", cfg.SubscriberID, err))
399399
}
400400

401-
ch := make(chan *pipeline.EmbelishedRequestMessage)
401+
// Buffer the per-subscription channel to the prefetch depth so
402+
// the receive callback's `ch <- emb` is non-blocking up to the
403+
// prefetch capacity. Without this, a transient stall in the
404+
// merge goroutine (e.g. pool output buffer full while workers
405+
// drain a saturated backend) propagates straight back to the
406+
// callback and halts Pub/Sub prefetch. Buffering decouples the
407+
// two for burst absorption; sustained saturation past the
408+
// buffer still applies backpressure to Pub/Sub, which is the
409+
// correct signal at that point.
410+
ch := make(chan *pipeline.EmbelishedRequestMessage, prefetchBufferDepth())
402411
p.requestChannels = append(p.requestChannels, RequestChannelData{
403412
requestChannel: pipeline.RequestChannel{
404413
Channel: ch,
@@ -596,10 +605,7 @@ func (r *PubSubMQFlow) requestWorker(ctx context.Context, pubSubClient *pubsub.C
596605
//
597606
// MaxOutstandingBytes = -1 removes the bytes-based limit (default
598607
// 1GB) so only message count gates prefetch.
599-
prefetchDepth := *batchSize * 5
600-
if prefetchDepth < 1000 {
601-
prefetchDepth = 1000
602-
}
608+
prefetchDepth := prefetchBufferDepth()
603609
sub.ReceiveSettings.MaxOutstandingMessages = prefetchDepth
604610
sub.ReceiveSettings.MaxOutstandingBytes = -1
605611
sub.ReceiveSettings.NumGoroutines = 1
@@ -736,8 +742,19 @@ func (r *PubSubMQFlow) requestWorker(ctx context.Context, pubSubClient *pubsub.C
736742
}
737743
}
738744

739-
ch <- emb
740-
stats.dispatched.Add(1)
745+
// Forward to the merge policy. Under sustained saturation the
746+
// buffered channel can still fill; select-with-ctx ensures
747+
// shutdown unblocks rather than deadlocking the callback (and
748+
// hence Receive(), which would never exit). On ctx cancel we
749+
// fire any subscription-gate releases attached to emb and
750+
// nack so Pub/Sub redelivers.
751+
select {
752+
case ch <- emb:
753+
stats.dispatched.Add(1)
754+
case <-ctx.Done():
755+
emb.FireReleases()
756+
resultsChannel <- ackAction{ack: false}
757+
}
741758
})
742759
if err != nil {
743760
logger.V(logutil.DEFAULT).Error(err, "Fail to receive messages from request subscription")
@@ -864,6 +881,19 @@ func synthesizePool(s TopicConfig) pipeline.Pool {
864881
return pool
865882
}
866883

884+
// prefetchBufferDepth is the shared sizing for both Pub/Sub's
885+
// MaxOutstandingMessages (transport-side prefetch) and the
886+
// per-subscription channel buffer (Go-side handoff to the merge
887+
// goroutine). Keeping them equal means the channel can hold the full
888+
// prefetch window without backpressuring into the callback.
889+
func prefetchBufferDepth() int {
890+
depth := *batchSize * 5
891+
if depth < 1000 {
892+
depth = 1000
893+
}
894+
return depth
895+
}
896+
867897
// bytesTrimLeftSpace returns b with leading ASCII whitespace removed.
868898
// Tiny helper to peek at the first non-whitespace byte without pulling
869899
// in the unicode/strings packages for one use site.

pkg/redis/sortedset_impl.go

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,22 +56,13 @@ type RedisSortedSetFlow struct {
5656
resultChannel chan api.ResultMessage
5757
pollInterval time.Duration
5858
batchSize int
59-
gateFactory pipeline.GateFactory
6059
}
6160

62-
// SortedSetOption is a functional option for configuring RedisSortedSetFlow.
63-
type SortedSetOption func(*RedisSortedSetFlow)
64-
65-
// WithGateFactory sets a GateFactory for subscription/pool gate
66-
// instantiation. Currently retained for forward compatibility; the
67-
// sortedset flow does not yet wire subscription or pool gate chains.
68-
func WithGateFactory(factory pipeline.GateFactory) SortedSetOption {
69-
return func(r *RedisSortedSetFlow) {
70-
r.gateFactory = factory
71-
}
72-
}
73-
74-
func NewRedisSortedSetFlow(opts ...SortedSetOption) (*RedisSortedSetFlow, error) {
61+
// Note: the sortedset flow does not currently wire subscription or
62+
// pool gate chains. Per-queue / per-pool gating is supported only on
63+
// the GCP Pub/Sub flow today (gcp-pubsub-gated). When SS gates are
64+
// re-introduced, a GateFactory will thread through here.
65+
func NewRedisSortedSetFlow() (*RedisSortedSetFlow, error) {
7566
configs, err := loadQueueConfigs()
7667
if err != nil {
7768
return nil, err
@@ -89,10 +80,6 @@ func NewRedisSortedSetFlow(opts ...SortedSetOption) (*RedisSortedSetFlow, error)
8980
batchSize: *ssBatchSize,
9081
}
9182

92-
for _, opt := range opts {
93-
opt(r)
94-
}
95-
9683
for _, cfg := range configs {
9784
headers, err := util.ExpandEnvMapValues(cfg.HTTPHeaders)
9885
if err != nil {

0 commit comments

Comments
 (0)