Skip to content

Add pool topology, unified gate interface, labels, and tier-priority dispatch#186

Draft
joeltg wants to merge 14 commits into
llm-d-incubation:mainfrom
joeltg:feat/tier-priority
Draft

Add pool topology, unified gate interface, labels, and tier-priority dispatch#186
joeltg wants to merge 14 commits into
llm-d-incubation:mainfrom
joeltg:feat/tier-priority

Conversation

@joeltg

@joeltg joeltg commented May 12, 2026

Copy link
Copy Markdown

Motivation

We want to support tier-priority dispatch across a shared inference fleet: strict priority across tiers (interactive, async, batch), per-team reservation quotas that are floors not ceilings so idle capacity isn't stranded, and interactive overflow that fails fast under saturation rather than waiting through redelivery delays. This PR implements the framework primitives that make this possible, and the policy itself.

A fundamental issue is that the existing gate interface — a float "dispatch budget" consulted at pull time — works against this:

  • When the backend saturates, the budget drops to zero, the streaming pull connection to Pub/Sub is torn down, and pre-fetched messages return to the broker.
  • When capacity recovers and the gate restarts the pull, there's a 1–2 second connection-reestablishment gap during which workers are idle.
  • Under sustained load this oscillates: a sawtooth where throughput collapses on every saturation cycle while waiting for the new pull connection. In our setup, we saw drops from ~4,900 tok/s down to ~400 tok/s at 200 concurrency.

Instead, we want to pull aggressively and always, and gate admission at the worker rather than at the pull loop. Messages wait in the in-process prefetch buffer at zero cost and dispatch the instant capacity opens, with no nack-and-redeliver round-trip.

To support this architecture, we implement the following scoped refactoring:

  1. Explicit pool topology
  2. Unified gate interface, with two gate evaluation sites (per-subscription and per-pool)
  3. Gate-assigned labels for request classification
  4. Tier-priority RequestMergePolicy and admission gates

Pool topology

A named Pool declares its gateway URL, worker count, and gate chain. Subscriptions reference a pool by ID. The merge policy now emits one buffered channel per pool; each pool gets its own dedicated worker pool so backpressure on one pool's downstream endpoint stalls only that pool's prefetch — not the entire pod's.

An example FlowConfig schema illustrating this (and the rest of the changes):

pools:
  - id: my-pool
    gateway_url: http://igw.svc:8000
    request_path: /v1/chat/completions
    model_name_override: llama-3.1-8b-instant
    workers: 32
    gates:
      - type: local-max-concurrency
        params: { max_concurrency: "64" }

subscriptions:
  - subscriber_id: inference-requests-teamA-async
    pool: my-pool
    labels: { team: teamA, tier: async }
    gates:
      - type: deadline-drop

Unified gate interface

The DispatchGate.Budget float interface is replaced by Gate.Apply, which returns a Verdict:

var Continue = Verdict{}
func Drop(r *ResultMessage) Verdict  // ack; publish r if non-nil (e.g. 429 fail-fast)
func Refuse() Verdict                // nack; transport redelivers

Gates run at two distinct sites with different blocking rules:

  • Subscription gates run in the Pub/Sub receive callback, before the merge policy. They must not block (blocking stalls prefetch). Their primary job is classification: they read and mutate msg.Labels to stamp decisions like class=reserved or class=overflow that the merge policy and pool gates act on downstream. Gate-written labels are the key mechanism for runtime classification — the subscription config supplies static identity labels (team, tier, model); gates supply dynamic classification labels whose values depend on system state at dispatch time.

  • Pool gates run in the per-pool worker pool, after the merge policy routes the message, before HTTP dispatch. They may block — because the message is already prefetched and in memory, parking the worker until capacity opens dispatches it with zero extra latency and no nack round-trip.

In both sites, multiple gates are provided in an array, are evaluated in order, and short-circuit on the first non-Continue result.

Labels

Subscriptions declare static labels (team, tier, model) in their config; the Flow stamps these onto every pulled message. Labels are entirely operator-controlled: transport attributes from producers round-trip via body.Metadata / result.Metadata instead, which is a separate namespace. Labels are a plain map[string]string with last-write-wins semantics: gates may overwrite any key, and each gate in the chain sees the mutations made by earlier ones. This is what makes runtime classification composable — a deadline gate can run first and drop the message; a reservation classifier runs next and stamps class; the merge policy buckets on that value.

Tier-priority RMP and admission gate

Two pieces that compose to implement the priority-tier dispatch:

  • The tier-priority RMP buckets messages by (pool, tier, class) and dispatches in strict priority across tiers (e.g. interactive > async > batch) with round-robin across (team, model) source channels within each tier. Here tier is a static subscription label set by the operator; class is a gate-written label (reserved or overflow) stamped at subscription-gate time based on whether the request is within the team's reservation cap — this is the reason the gate-labels pipeline exists. The RMP stamps the engine-side priority header and is pure ordering: it never consults a gate.

  • The tier-priority-admission pool gate reads a pool saturation signal and returns a three-way verdict based on labels: block if class=reserved (don't nack work that legitimately deserves a slot; park and dispatch the instant capacity opens); Drop with 429 if tier=interactive, class=overflow (interactive's SLA is sub-second; fail fast so the producer can fall back); Refuse for all other overflow (nacking async/batch overflow is correct — blocking it would tie up workers and head-of-line-block reserved messages behind it in the per-pool FIFO channel). The saturation signal is pluggable: a Prometheus-backed source for self-hosted IGW pools, a redis-counter source for external providers with hard concurrency limits.

                              ┌──────────┐
                              │ Producer │
                              └────┬─────┘
                                   │ publish
                                   ▼
                              ┌─────────┐
                              │ Pub/Sub │  request topic
                              │  topic  │
                              └────┬────┘
                                   │ pull
  ╔════════════════════════════════▼═════════════════════════════════╗
  ║                       async-processor pod                        ║
  ║                                                                  ║
  ║         ┌─────────────────────────────────────────┐              ║
  ║         │ Flow receive callback                   │              ║
  ║         │ ├ register tracker, spawn ack goroutine │              ║
  ║         │ ├ subscription gate chain (non-blocking)│              ║
  ║         │ │   • deadline-drop                     │              ║
  ║         │ │   • reservation-classifier            │              ║
  ║         │ └ → per-subscription channel            │              ║
  ║         └────────────────────┬────────────────────┘              ║
  ║                              ▼                                   ║
  ║         ┌─────────────────────────────────────────┐              ║
  ║         │ Merge policy (tier-priority RMP)        │              ║
  ║         │ ├ bucket by (pool, tier, class)         │              ║
  ║         │ ├ reserved before overflow (strict)     │              ║
  ║         │ ├ within each class: tier order (strict)│              ║
  ║         │ ├ round-robin across (team,model) at    │              ║
  ║         │ │   the leaves                          │              ║
  ║         │ └ → per-pool channel                    │              ║
  ║         └────────────────────┬────────────────────┘              ║
  ║                              ▼                                   ║
  ║         ┌─────────────────────────────────────────┐              ║
  ║         │ Per-pool worker (1 of K)                │              ║
  ║         │ ├ pool gate chain (may block)           │              ║
  ║         │ │   • tier-priority-admission           │              ║
  ║         │ │   • local-rate-limit (optional)       │              ║
  ║         │ └ HTTP dispatch                         │              ║
  ║         └────────────────────┬────────────────────┘              ║
  ║                              │                                   ║
  ╚══════════════════════════════│═══════════════════════════════════╝
                                 │ HTTP
                                 ▼
                            ┌─────────┐
                            │   IGW   │
                            └────┬────┘
                                 ▼
                            ┌─────────┐
                            │ Engine  │  (SGLang / vLLM / external)
                            └────┬────┘
                                 │ response
                                 ▼
                            ┌─────────┐
                            │ Pub/Sub │  result topic
                            │  topic  │
                            └─────────┘

  ack/nack flows back to the request topic from the ack goroutine
  spawned in the receive callback (above): every Verdict outcome routes
  through resultsChannel → ack goroutine → msg.Ack() / msg.Nack().

@github-actions

Copy link
Copy Markdown

Unsigned commits detected! Please sign your commits.

For instructions on how to set up GPG/SSH signing and verify your commits, please see GitHub Documentation.

@joeltg joeltg force-pushed the feat/tier-priority branch 2 times, most recently from 873472d to 644d434 Compare May 13, 2026 00:51
joeltg and others added 10 commits May 13, 2026 01:03
…dispatch

- pipeline: add Labels type with Get/Has/Set/Merge/Clone helpers
- pipeline: add Pool and GateConfig structs for pool topology
- pipeline: add PoolDispatch (per-pool channel map) as RMP output
- pipeline: add RegisterMergePolicy/NewMergePolicy/MergePolicyDeps registry
- pipeline: add Release/AttachRelease/FireReleases to EmbelishedRequestMessage
- pipeline: add Labels/PoolID/HTTPHeaders/ModelNameOverride to RequestChannel
- pipeline: add RetryReason to RetryMessage, Pools() to Flow interface
- api: add Labels to InternalRouting and ResultMessage
- pkg/util: add ExpandEnvMapValues helper
- pkg/async: register random-robin via init(), return PoolDispatch
- pkg/pubsub: add FlowConfig/TopicConfig pool fields, Pools(), label seeding
- pkg/redis: add Pools() to both flow impls, new channel type, label seeding
- pkg/asyncworker: add FireReleases defer, Labels/ModelNameOverride support
- cmd/main: use NewMergePolicy registry, spawn per-pool worker pools

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
…flowcontrol

- pipeline: add Gate/Verdict/GateFunc/AlwaysContinue/ApplyChain/Release to gate.go
- pipeline: add gate_test.go with full chain/snapshot/error tests
- pipeline: remove DispatchGate/AttributeGate/DispatchGateFunc/ConstOpenGate/GateFactory(old); update GateFactory to return Gate
- flowcontrol: delete prometheus/metric/composite/dispatch gate files (10 files)
- flowcontrol: rewrite gate_factory.go; add deadline/local-rate-limit/local-max-concurrency/constant-decision gates
- pkg/redis: delete dispatch_gate.go, quota_gate.go (superseded by Verdict-based Gate)
- pkg/pubsub: migrate gate eval to Gate.Apply + Verdict handling
- pkg/asyncworker: migrate to Gate.Apply + Verdict handling with FireReleases defer
- tests: update worker test for new error payload format

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
…b flow

- TopicConfig: add Gates []GateConfig for subscription-level gate config
- PubSubMQFlow: build one pool gate chain per pool (shared by all subscriptions
  routing to that pool) so LocalMaxConcurrencyGate caps the pool, not the sub
- countingGate: wraps pool gate chain with per-subscription dispatchInFlight stats
- RequestChannelData: add subscriptionGates field for pull-time gate chain
- requestWorker: evaluate subscription gate ApplyChain before pool gate, with
  full Verdict/Release handling; pass subscriptionGates param
- Add buildPoolGateChain, buildSubscriptionGateChain, poolGateTypesOf helpers

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
Remove docs/dispatch-budget.md and docs/guides/e2e-deploy.md, which
documented the float-budget DispatchGate system (prometheus-budget,
prometheus-saturation) that was removed in the gate interface refactor.

Rewrite the README gate system section to describe the Verdict/Gate
interface and the two evaluation sites (subscription gates: fast,
label-mutating; pool gates: may block). Replace the old flat
gate_type-per-subscription config examples with the FlowConfig pools +
subscriptions format. Update the merge policy description to reflect
per-pool channels and the registration model.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
Contributes the policy-layer pieces that complete the priority-tier RFC
implementation:

**Tier-priority RMP** (`pkg/async/inference/mergepolicy/tierpriority/`)

Label-driven N-pass merge policy. Reads `tier` and `class` labels (key
names configurable), buckets per `(pool, tier, class)`, runs a strict-
priority loop across tiers with round-robin across `(team, model)` source
channels within each tier, and routes each dispatched message to its
pool's output channel. Stamps the engine-side priority header
(`x-priority`) based on `(tier, class)`. Pure routing and ordering —
the RMP never consults a gate and never reads a Verdict. Registers via
`pipeline.RegisterMergePolicy` under `"tier-priority"`.

**`tier-priority-admission` pool gate** (`pkg/async/inference/flowcontrol/`)

The runtime counterpart to the tier-priority RMP. Three-way verdict at
pool saturation, keyed on message labels:

  - class=reserved (any tier) → Block until capacity or ctx done.
    The message is already prefetched into the pod; blocking dispatches
    the moment capacity opens with no transport redelivery round-trip.
  - tier=interactive, class=overflow → Drop with 429-shaped Result.
    Interactive's sub-second SLA: better to fail fast so the producer
    can fall back than wait through nack-redeliver.
  - other overflow (async, batch) → Refuse (nack-redeliver).
    Blocking overflow would tie up workers and head-of-line-block
    reserved messages behind it in the per-pool FIFO channel; Refuse
    keeps workers cycling so the RMP's priority ordering is preserved
    at consumption.

**Pluggable `PoolLoadSource`** — the saturation signal is swappable:

  - `source: prometheus` — `PromQLLoadSource` runs a configurable PromQL
    query on a background goroutine (atomic store; zero dispatch-path
    latency). Fail-open on stale/error. Right for self-hosted IGW-fronted
    pools where the EPP or engine emits saturation metrics.
  - `source: redis-counter` — `RedisCounterLoadSource` uses a redis-backed
    atomic INCR/DECR counter (same Lua-script pattern as the
    reservation-classifier gate contributed separately). Strict accounting;
    no overshoot. Right for external pools (e.g. provider APIs with hard
    concurrency limits).

`GateFactory` gains `WithRedisClient` and `WithBackgroundContext` options.
`cmd/main.go` initialises the signal-handler context before the factory so
PromQL refresh goroutines are stopped on shutdown.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
Labels are operator-controlled pipeline state, not producer-visible
results. Nothing in the transport publish path used ResultMessage.Labels,
and the field comment described an intent (stamping transport-native
attributes on result publish) that was never implemented and that we
don't want. Remove the field from api.ResultMessage and the three worker
call sites that set it.

Also fix the RFC reference and clarify 'class' in the PR description.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>

refactor: remove Pool.Labels

Labels flow from subscriptions downward through gates — they are
per-subscription identity (team, tier, model) that gates may classify
further. A pool is a dispatch target, not a label source; pool-level
labels conflate the two and add a confusing third merge layer.

Subscriptions already declare any pool-wide label once via their own
Labels field. The pool-ID auto-injection (labels["pool"] = Pool.ID)
is preserved as it is framework-injected, not operator-declared.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
Cross-reference review against the private development fork surfaced
several drift points from the original cherry-pick. This commit aligns
upstream with the validated state.

Critical fixes:
- pubsubimpl: pool gates were being applied at the receive callback
  AND again at worker dispatch (double application). Removed the
  callback-site evaluation so pool gates run only in the per-pool
  worker, matching the PR description's architecture.
- pubsubimpl: progressStats had data races (plain int64 fields
  written from multiple goroutines, no reader). Converted to
  atomic.Int64 with dispatchInFlightPeak and added the missing
  logProgress goroutine started in Start().

Functionality restored:
- Added reservation_classifier_gate, the redis-backed multi-key
  in-flight cap that replaces the removed quota_gate functionality.
  Wired through gate_factory and cmd/main.go (shared redis client
  threaded into the factory via WithRedisClient).
- Added Pool.Labels for pool-level static labels merged under
  subscription labels in pubsubimpl's channelLabels assembly.

Tests:
- Ported pipeline/labels_test.go, releases_test.go.
- Ported behavioral tests for constant_decision, deadline, and
  local_max_concurrency gates.
- Deleted broken integration tests referencing removed types
  (pipeline.AttributeGate, gate.Budget, redis-quota).
- Deleted e2e tests + helm values for removed gate types
  (saturation, budget, redis-gate, quota, composite).

Cleanup:
- Removed the var _ = splitAndTrimCSV dead-code cloak (now used
  by reservation-classifier).
- Clarified tier-priority policy package doc: class is the dominant
  axis, tier the tiebreaker — reserved/batch outranks
  overflow/interactive by design.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
…ntract

P0 — Pub/Sub ack leak on subscription-gate Terminate
The ack-handling goroutine was spawned after `ch <- emb`, but
subscription-gate Drop/Refuse paths return before that point. The
resultsChannel send sat unread, the resultChannels map entry leaked,
inFlight was never decremented, and the Pub/Sub message stayed
outstanding until ack-deadline expiry.

Fixed by spawning the ack-handling goroutine immediately after creating
the resultTracker (before any gate evaluation). Every terminal path —
subscription-gate Terminate, normal dispatch + worker terminal, or
ctx cancel — now drives ack/nack uniformly via resultsChannel.

P1 — ApplyChain error semantics
ApplyChain was converting any gate error into Refuse, which broke
gates that intentionally return (Continue, err) for fail-open
observability (reservation-classifier on redis outage, tier-priority
admission on stale Prometheus). Those errs were being turned into
nacks and redelivery loops instead of fail-open continues.

New contract: the gate's Verdict is authoritative for control flow;
errors are informational, joined via errors.Join, and returned alongside
the final Verdict for caller-side logging. A gate that wants to nack on
error returns Refuse explicitly.

Updated callers (countingGate, pubsubimpl subscription-gate handler,
asyncworker.Worker) to log err and honor the verdict. Updated the
matching test in pipeline/gate_test.go and added one for the explicit
Refuse + err case.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
…ub buffering

P1 — tierpriority strands unknown labels
popNext only polled buckets in the configured ClassOrder × TierOrder
(plus empty-string fallbacks). A message with tier=urgent or class=gold
(typo or schema drift) landed in a bucket nothing consults — work
stranded until the input channel closed.

Added popUnconfigured: a final catch-all pass that drains any bucket
whose (tier, class) key isn't covered by the configured order, at
lowest priority. The policy is now total over its input domain.

P1 — helm chart emits removed flags
ap-deployments.yaml still emitted --redis.ss.gate-type and
--redis.ss.gate-params (flags removed when the SS gate impl was
dropped). Any operator setting ap.redis.gateType would crashloop on
"flag provided but not defined". Removed the block, the matching
values.yaml defaults, the dead gateParamsJson helper, and the e2e
deploy values references.

P1 — SS gating landing-state inconsistency
WithGateFactory on RedisSortedSetFlow was stored-but-unread, main.go
logged "with per-queue gating", and README told users to switch to SS
for per-queue gating — all false (SS gates aren't wired). Removed
WithGateFactory + SortedSetOption + gateFactory field, dropped the
arg from main.go, fixed the log line, and corrected README to direct
operators at gcp-pubsub-gated for actual per-queue gating.

P2 — pubsub callback could block prefetch under saturation
The per-subscription channel was unbuffered, so under sustained
saturation a stalled merge goroutine propagated straight back to the
receive callback, halting Pub/Sub prefetch. Worse, ctx cancellation
mid-stall deadlocked the callback (Receive() never exits).

Buffered the channel to the same prefetch depth as Pub/Sub's
MaxOutstandingMessages (extracted prefetchBufferDepth helper) so the
callback is non-blocking up to the prefetch window. Wrapped the send
in select-with-ctx so shutdown unblocks and any subscription-gate
releases fire cleanly on cancel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: joeltg <joel@reflection.ai>
…de-on-quota-gate

Upstream PR llm-d-incubation#183 ("Adding a classifier mode to quota gate") adds a
classifier path to the pre-refactor quota_gate / composite_gate /
AttributeGate / DispatchGate machinery that this branch removed in
favor of the unified Gate/Verdict model. The PR also stamps a typed
Classification field on InternalRouting alongside the framework's
generic Labels map.

This branch already implements the same functional capability via the
reservation-classifier gate (operator-configurable redis-backed
multi-key in-flight cap that stamps a `class` label), which composes
naturally with the tier-priority RMP and admission gate. There is no
useful payload in PR llm-d-incubation#183's code beyond what we already have, and its
shape conflicts with the redesigned interfaces.

Resolution: -X ours wholesale, plus re-applying our deletions of the
files PR llm-d-incubation#183 modified that this branch had already removed
(composite_gate{,_test}.go, quota_gate{,_test}.go,
test/integration/gate_factory_redis_test.go,
test/integration/sortedset_quota_gate_test.go).

Signed-off-by: joeltg <joel@reflection.ai>
@joeltg joeltg force-pushed the feat/tier-priority branch from 644d434 to 31021ce Compare May 13, 2026 01:05
joeltg and others added 4 commits May 13, 2026 23:30
…ck; closed-channel exit; fail-closed on unknown gate types

Four review findings, addressed together:

1. Pub/Sub redelivery race with parked workers.
   Pool gates intentionally block (tier-priority-admission reserved-branch,
   local-max-concurrency, local-rate-limit). With MaxExtension capped at
   10m, a message parked behind saturation could be redelivered to another
   consumer while the first worker eventually dispatched it too.

   Added EmbelishedRequestMessage.TransportDeadline. The Pub/Sub callback
   stamps it from MaxExtension (minus a 10%-of-extension safety margin,
   clamped to [1s, 30s]) via pubSubTransportDeadline. The worker derives a
   gateCtx with that deadline and passes it to msg.Gate.Apply. On
   Verdict.Redeliver with gateCtx.Err() != nil, the worker calls
   nackImmediately — bypassing retryMessageWithReason's backoff so the
   broker's redelivery owns the next attempt rather than this worker
   holding the message past its lease.

   --pubsub.max-extension makes the budget operator-tunable.

2. "Publish result before ack" not enforced.
   publishPubSub now waits on result.Get and returns the error.
   resultWorker only acks the originating request after a successful
   publish; on publish failure it nacks. resultWorker takes a publishFunc
   so the contract can be tested with a stub.

3. Workers spin on closed dispatch channels.
   Worker's receive now uses msg, ok := <-requestChannel and returns
   on !ok. Otherwise after the merge policy closes its output channel,
   the worker would loop forever on zero-value messages.

4. Unknown gate types failed open.
   GateFactory.CreateGate returned AlwaysContinue for unknown gate names,
   so a YAML typo in tier-priority-admission silently disabled admission
   control. Now returns an error. Empty gate type stays open as the
   explicit constant case.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per-input readers were draining request channels into b.queues without
any cap, while the dispatcher could block on the bounded pool output
channel downstream. Under pool saturation, the receive callback could
keep "dispatching" into the RMP while messages sat hidden in buckets
past pubsub.max-extension — Pub/Sub then redelivered (because the
TransportDeadline check upstream of the RMP didn't see the queued backlog),
causing duplicate delivery and unbounded memory growth.

Added a per-subscription queued slot count (poolState.queued + maxQueued)
protected by the existing mu/cond. reader() now calls acquireInputSlot
before each receive; the dispatcher calls releaseInputSlot when it pops
and forwards a message. When subscription N has PerSubscriptionBuffer
messages held in the RMP, that subscription's reader blocks before
receiving, propagating backpressure to the Flow callback's `ch <- emb`
and (transitively) to Pub/Sub prefetch flow control — which is where
TransportDeadline now actually triggers a nack on saturated pools.

Default PerSubscriptionBuffer = 2. Pub/Sub is the durable backlog; the
RMP only needs a small lookahead per independently-filtered source.
Tunable per-deployment via the per_subscription_buffer merge-policy
config key.

Tests cover: bound applies per reader; one subscription's bound doesn't
block another subscription's read; config parser accepts/rejects values.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a subscription gate returned Drop(result), the callback forwarded
the result to resultChannel and then independently sent ackAction{ack:
true} to resultsChannel — bypassing resultWorker's publish-confirmation
contract. If publishing failed, the Pub/Sub request was already acked
and the caller would never see a response.

Extracted the Terminate-verdict handling into
handleSubscriptionGateTerminate. When v.Result != nil, the handler
forwards the result to resultChannel and intentionally does NOT send an
ack here. resultWorker is the sole ack source for that path — it
publishes the result and only then sends ack/nack to the tracked
resultsChannel based on publish success.

Silent drops (v.Result == nil, !v.Redeliver) continue to direct-ack
because no publish is involved. Refuse paths nack with the existing
delay/no-delay logic.

Tests cover both branches: Drop(result) forwards to resultChannel
without acking; silent Drop(nil) acks directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two related changes for redis transports:

1. Closed-channel safety. drainBatch and the retry/result workers in
   both redis-pubsub and redis-sortedset implementations could
   busy-loop on a closed channel, receiving zero values forever.
   Added ok-checks to every channel receive in those workers. Also
   added a select-with-ctx around the redis-pubsub requestWorker's
   msgChannel send and a nil-rmsg guard, so a transient subscription
   hiccup doesn't deadlock the loop.

2. Sorted-set gate parity. The sortedset Flow had RequestChannel.Gate
   plumbing but no way to actually populate it from config — the
   WithGateFactory option was retained "for forward compatibility" but
   never read. Wire it through:

   - queueConfig gains GateType / GateParams (single pool gate, parity
     with pubsub legacy) and Gates []GateConfig (queue gate chain,
     parity with pubsub subscription gates).
   - --redis.ss.gate-type / --redis.ss.gate-params flags for single-
     queue mode (parity with pubsub flags).
   - synthesizeSortedSetPool + buildRedisGateChain + gateFromChain
     helpers build the pool gate chain at startup, attached to
     RequestChannel.Gate so the worker runs it before dispatch.
   - Queue-gate chain runs in processMessages between ZPopMin and the
     msgChannel send. handleQueueGateTerminate routes Refuse →
     retryChannel (or requeue if no retry channel), Drop(result) →
     resultChannel, silent Drop → discard.
   - Pools() now returns the full pool shape (RequestPath, HTTPHeaders,
     ModelNameOverride, Labels) so per-pool worker spawning has the
     complete dispatch context.

cmd/main.go now wires the gateFactory into the sortedset flow.
README documents the new flags and per-queue gate/Gates fields.

Tests cover: drainBatch closed-channel exit; queue-config drives both
pool and subscription gate factories with correct ordering; single-
queue flag mode parses gate-type/gate-params; queue-gate Drop(result)
forwards to resultChannel and does not dispatch; queue-gate Refuse
forwards to retryChannel with gate_refused reason.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

@lioraron lioraron left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: 8 inline comments covering correctness, safety, and test coverage.

Comment thread cmd/main.go
"gatewayURL", pool.GatewayURL,
"workers", workers)
for w := 0; w < workers; w++ {
go asyncworker.Worker(ctx, impl.Characteristics(), inferenceClient, ch, impl.RetryChannel(), impl.ResultChannel(), requestTimeout)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker goroutines fire-and-forget: Workers are spawned with bare go asyncworker.Worker(...) and nothing waits for them to finish — the old WaitGroup was removed. After <-ctx.Done() returns (line 224), main() exits immediately, so in-flight requests are abandoned mid-dispatch. This can lose results or leave partial state in Redis.

Add a sync.WaitGroup (or similar) so the shutdown path waits for all workers to drain before exiting.

// Start runs the refresh loop until ctx is canceled. Idempotent — a
// second call returns immediately because Run-once is the contract;
// the gate factory wires this when first used.
func (s *PromQLLoadSource) Start(ctx context.Context) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start is not idempotent despite the doc claiming it is: Each call launches a new goroutine with a new ticker. If called twice you get duplicate concurrent refresh loops hammering Prometheus.

Add a sync.Once guard (or an atomic flag) to match the documented contract.

select {
case <-ctx.Done():
return pipeline.Refuse(), ctx.Err()
case <-time.After(wait):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.After timer leak: time.After(wait) inside this for loop allocates a timer per iteration that cannot be GC'd until it fires. Under sustained load with many goroutines blocked here, this leaks memory.

Replace with time.NewTimer + defer timer.Stop() (or reset the same timer across iterations).

cacheTTL time.Duration
redisClients map[string]*goredis.Client
rdb *redis.Client
ctx context.Context

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context.Context stored in struct: This is a well-documented Go anti-pattern (style guide). The context here drives background goroutines (PromQL refresh loops).

Consider passing it through a Start(ctx) method or a dedicated Run(ctx) lifecycle method instead of baking it into the struct.

Comment thread pipeline/registry.go
mergePolicyMu.Lock()
defer mergePolicyMu.Unlock()
if _, exists := mergePolicyRegistry[name]; exists {
panic(fmt.Sprintf("pipeline: merge policy %q already registered", name))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic on duplicate registration: Combined with init()-based registration, a misconfigured import crashes the process at startup with no recovery path. Return an error instead — callers (e.g. NewMergePolicy) already have error paths.

Comment thread pipeline/labels.go
}

// Set assigns value to key. Panics if l is nil.
func (l Labels) Set(key, value string) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set panics on nil receiver: Get and Has are explicitly nil-safe, but Set (and Merge) panic on nil. Since external gate authors call msg.Labels.Set(...), this is a footgun if Labels was never initialized.

Either auto-initialize in Set/Merge (return a new map), or add a prominent doc warning that callers must check for nil.

Comment thread pipeline/gate.go
}

// Continue is the zero-value Verdict: forward to the next stage.
var Continue = Verdict{}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutable package-level sentinel: var Continue = Verdict{} can be accidentally mutated by any gate (e.g. pipeline.Continue.Terminate = true), corrupting it globally for all callers.

Make it a function func Continue() Verdict { return Verdict{} }, or keep the var but make it unexported with a public accessor that returns a copy.

for {
select {
case <-ctx.Done():
return pipeline.Refuse(), ctx.Err()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test coverage: This file has no _test.go counterpart. Token-bucket logic with timing (refill rates, burst capacity, edge cases around zero/negative inputs) is easy to get wrong and should have unit tests before merge.

@lioraron

Copy link
Copy Markdown
Contributor

Thanks @joeltg for this extensive work!

This PR covers a lot of ground — gate interface migration, labels, pool topology, merge policy registry, flow control gates, and tier-priority dispatch. Each of these is a meaningful feature with its own design surface, and bundling them makes review harder and riskier (e.g. the 8 inline comments I left are spread across unrelated subsystems).

Would it be possible to split this into smaller, independently mergeable PRs? A reasonable split might be:

  1. Gate interface migrationVerdict-based Gate replacing DispatchGate.Budget(), ApplyChain, AlwaysContinue
  2. Labels + merge policy registryLabels type, RegisterMergePolicy, MergePolicyFactory
  3. Pool topology + per-pool workersPool type, per-pool worker spawning, MergeRequestChannels
  4. Flow control gates + tier-priority dispatchLocalRateLimitGate, PromQLLoadSource, tier-priority admission, reservation classifier

I'd also suggest opening a corresponding issue for each PR that includes at least:

  • Problem description: what's missing or broken today, and why it matters
  • High-level design: the approach, key types/interfaces, and how it interacts with existing code

This makes each change independently reviewable, gives reviewers the context to evaluate design decisions, and creates a traceable record of why each piece exists.

Comment thread pipeline/gate.go
Comment on lines +32 to +34
Terminate bool
Redeliver bool
Result *api.ResultMessage

@evacchi evacchi May 20, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go for an alternative design here, instead of the double bool; the two-bool encoding allows a fourth state (Terminate=false, Redeliver=true) and forces every consumer to reason about their combination; I'd advise going with something like:

type VerdictKind int

const (
    VerdictContinue VerdictKind = iota
    VerdictDrop
    VerdictRefuse
)

type Verdict struct {
   Kind   VerdictKind
   Result *api.ResultMessage 
}

@jtechapps jtechapps mentioned this pull request Jun 4, 2026
8 tasks
@github-actions

Copy link
Copy Markdown

This PR is marked as stale after 21d of inactivity. After an additional 14d of inactivity (7d to become rotten, then 7d more), it will be closed. To prevent this PR from being closed, add a comment or remove the lifecycle/stale label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants