The Problem: High-performance accelerators often suffer from low utilization in strictly online serving scenarios, or users may need to mix latency-insensitive workloads into slack capacity without impacting primary online serving.
The Value: This component enables efficient processing of requests where latency is not the primary constraint (i.e., the magnitude of the required SLO is ≥ minutes).
By utilizing an asynchronous, queue-based approach, users can perform tasks such as product classification, bulk summarizations, summarizing forum discussion threads, or performing near-realtime sentiment analysis over large groups of social media tweets without blocking real-time traffic.
Architecture Summary: The Async Processor is a composable component that provides services for managing these requests. It functions as an asynchronous worker that pulls jobs from a message queue and dispatches them to an inference gateway, decoupling job submission from immediate execution.
• Latency Insensitivity: Suitable for workloads where immediate response is not required.
• Capacity Optimization: Useful for filling "slack" capacity in your inference pool.
The architecture adheres to the following core principles:
-
Bring Your Own Queue (BYOQ): All aspects of prioritization, routing, retries, and scaling are decoupled from the message queue implementation.
-
Composability: The end-user does not interact directly with the processor via an API. Instead, the processor interacts solely with the message queues, making it highly composable with offline batch processing and asynchronous workflows.
-
Resilience by Design: If real-time traffic spikes or errors occur, the system triggers intelligent retries for jobs, ensuring they eventually complete without manual intervention.
- Async Processor (AP) - User Guide
To deploy the Async Processor into your K8S cluster, follow these steps:
- Create an
.envfile withexportstatements overrides. E.g.:
IMAGE_TAG_BASE=<if needed to override for a private registry>
DEPLOY_LLM_D=false
DEPLOY_REDIS=false
DEPLOY_PROMETHEUS=false
AP_IMAGE_PULL_POLICY=Always- Run:
make deploy-ap-on-k8s- To test a request (only for the Redis implementation):
- Subscribing to the result channel (different terminal window):
export REDIS_IP=.... kubectl run -i -t subscriberbox --rm --image=redis --restart=Never -- /usr/local/bin/redis-cli -h $REDIS_IP SUBSCRIBE result-queue
- Publishing a request:
export REDIS_IP=.... kubectl run --rm -i -t publishmsgbox --image=redis --restart=Never -- /usr/local/bin/redis-cli -h $REDIS_IP PUBLISH request-queue '{"id" : "testmsg", "payload":{ "model":"food-review-1", "prompt":"Hi, good morning "}, "deadline" :"23472348233323" }'
-
concurrency: The number of concurrenct workers, default is 8. -
request-merge-policy: Currently only supporting random-robin policy. -
message-queue-impl: Implementation of the queueing system. Options are gcp-pubsub for GCP PubSub, gcp-pubsub-gated for GCP PubSub with per-topic gating, redis-sortedset for Redis Sorted Set (persisted and sorted), and redis-pubsub for ephemeral Redis-based implementation. -
prometheus-url: Prometheus server URL for metric-based gates (e.g., http://localhost:9090). For Google Managed Prometheus (GMP), point this to a local proxy or GMP frontend that handles authentication — direct GMP URLs are not supported as the Async Processor does not perform GMP authentication.
This flag is required when using metric-based per-queue gates (e.g.,prometheus-saturation).
additional parameters may be specified for concrete message queue implementations
The Async Processor supports dispatch gates to control batch processing based on system capacity. Gates can be configured per-queue (via configuration files).
For more fine-grained control, configure gates per queue in your configuration file. Each queue can have its own gate type and parameters.
Gate Types:
constant: Always returns budget 1.0 (fully open) - no throttling.redis: Queries Redis for dispatch budget (managed by external system).prometheus-saturation: Queries Prometheus for pool saturation metric. Returns 1.0 - saturation if below threshold, 0.0 otherwise.
Example Configuration with Per-Queue Gates:
[
{
"queue_name": "critical_queue",
"inference_objective": "critical-task",
"request_path_url": "/v1/inference",
"gate_type": "constant"
},
{
"queue_name": "batch_queue",
"inference_objective": "batch-task",
"request_path_url": "/v1/inference",
"gate_type": "prometheus-saturation",
"gate_params": {
"pool": "inference_pool_1",
"threshold": "0.8"
}
},
{
"queue_name": "redis_gated_queue",
"inference_objective": "gated-task",
"request_path_url": "/v1/inference",
"gate_type": "redis",
"gate_params": {
"address": "localhost:6379",
"budget_key": "my-budget-key"
}
}
]Gate Parameters:
-
redis:address(required): Redis server address for the dispatch gate (e.g.,localhost:6379). Queues sharing the same address will share the same connection pool.budget_key(optional): Redis key to read dispatch budget from. Default isdispatch-gate-budget.
-
prometheus-saturation:pool: The inference pool name to query metrics for.threshold: Saturation threshold (0.0-1.0). When saturation >= threshold, budget is 0.0. Default is 0.8.fallback: Fallback saturation value (0.0-1.0) used when the metric source returns an error or empty data. Default is 0.0.query: Custom PromQL expression to query. If omitted, a default query usinginference_extension_flow_control_pool_saturationwith thepoollabel is used.
The async processor expects request messages to have the following format:
{
"id" : "unique identifier for result mapping",
"created": "created timestamp in Unix seconds",
"deadline" : "deadline in Unix seconds",
"payload" : {regular inference payload as a byte array}
}Example:
{
"id" : "19933123533434",
"created": "1764044000",
"deadline" : "1764045130",
"payload": byte[]({"model":"food-review","prompt":"hi", "max_tokens":10,"temperature":0})
}The Async Processor supports multiple request message queues. A Request Merge Policy can be specified to define the merge strategy of messages from the different queues.
Currently the only policy supported is Random Robin Policy which randomly picks messages from the queues.
When a message processing has failed, either shedded or due to a server-side error, it will be scheduled for a retry (assuming the deadline has not passed).
Results will be written to the results queue and will have the following structure:
{
"id" : "id mapped to the request",
"payload" : byte[]{/*inference result payload*/} ,
// or
"error" : "error's reason"
}A persisted implementation based on Redis SortedSets.
redis.ss.addr: Address of the Redis server. Default is localhost:6379.redis.ss.igw-base-url: Base URL of the IGW (e.g. https://localhost:30800).
Mutually exclusive withredis.ss.queues-config-fileflag.redis.ss.request-path-url: Request path url (e.g.: "/v1/completions").
Mutually exclusive withredis.ss.queues-config-fileflag.")redis.ss.inference-objective: InferenceObjective to use for requests (set as the HTTP header x-gateway-inference-objective if not empty).
Mutually exclusive withredis.ss.queues-config-fileflag.redis.ss.request-queue-name: The name of the sorted-set for the requests. Default is request-sortedset.
Mutually exclusive withredis.ss.queues-config-fileflag.redis.ss.result-queue-name: The name of the list for the results. Default is result-list.redis.ss.queues-config-file: The configuration file name when using multiple queues.
Mutually exclusive withredis.ss.igw-base-url,redis.ss.request-queue-name,redis.ss.request-path-urlandredis.ss.inference-objectiveflags.redis.ss.poll-interval-ms: Poll interval in milliseconds. Default is 1000.redis.ss.batch-size: Number of messages to process per poll. Default is 10.redis.ss.gate-type: Gate type for single-queue mode (e.g.,redis,prometheus-saturation). Only used whenredis.ss.queues-config-fileis not set.redis.ss.gate-params: JSON-encoded gate params map for single-queue mode (e.g.,{"address":"localhost:6379"}). Only used whenredis.ss.queues-config-fileis not set.
NOTE: Consider using the Redis Sorted Set implementation for production use. As it is offers persistence and priority sorting.
An example implementation based on Redis channels is provided.
- Redis Channels as the request queues.
- Redis Sorted Set as the retry exponential backoff implementation.
- Redis Channel as the result queue.
redis.addr: Address of the Redis server. Default is localhost:6379.redis.igw-base-url: Base URL of the IGW (e.g. https://localhost:30800).
Mutually exclusive withredis.queues-config-fileflag.redis.request-path-url: Request path url (e.g.: "/v1/completions").
Mutually exclusive withredis.queues-config-fileflag.")redis.inference-objective: InferenceObjective to use for requests (set as the HTTP header x-gateway-inference-objective if not empty).
Mutually exclusive withredis.queues-config-fileflag.redis.request-queue-name: The name of the channel for the requests. Default is request-queue.
Mutually exclusive withredis.queues-config-fileflag.redis.retry-queue-name: The name of the channel for the retries. Default is retry-sortedset.redis.result-queue-name: The name of the channel for the results. Default is result-queue.redis.queues-config-file: The configuration file name when using multiple queues.
Mutually exclusive withredis.igw-base-url,redis.request-queue-name,redis.request-path-urlandredis.inference-objectiveflags.
The configuration file when using the redis.queues-config-file flag should have the following format:
[
{
"queue_name": "some_queue_name",
"igw_base_url": "http://localhost:30800",
"inference_objective": "some_inference_objective",
"request_path_url": "/v1/completions"
},
{
"queue_name": "another_queue",
"igw_base_url": "http://localhost:30800",
"inference_objective": "batch_task",
"request_path_url": "/v1/inference"
}
]Note: The ephemeral Redis Channels implementation does not support per-queue dispatch gates. Use the Redis Sorted Set implementation for per-queue gating.
Configuration Fields:
queue_name: The name of the Redis channel for this queue.igw_base_url: Base URL of the IGW.inference_objective: The inference objective header value.request_path_url: The request path URL.
The GCP PubSub implementation requires the user to configure the following:
- Requests Topic and a Subscription having the following configurations:
- Exactly once delivery.
- Retries with exponential backoff.
- Dead Letter Queue (DLQ).
- Results Topic.
Note: If DLQ is NOT configured for the request topic. Retried messages will be counted multiple times in the #_of_requests metric.
pubsub.project-id: The name GCP project ID using the PubSub API.pubsub.igw-base-url: Base URL of the IGW (e.g. https://localhost:30800).
Mutually exclusive withpubsub.topics-config-fileflag.pubsub.request-path-url: Request path url (e.g.: "/v1/completions").
Mutually exclusive withpubsub.topics-config-fileflag.pubsub.inference-objective: InferenceObjective to use for requests (set as the HTTP header x-gateway-inference-objective if not empty).
Mutually exclusive withpubsub.topics-config-fileflag.pubsub.request-subscriber-id: The subscriber ID for the requests topic.
Mutually exclusive withpubsub.topics-config-fileflag.pubsub.result-topic-id: The results topic ID.pubsub.batch-size: Number of inflight messages. Default is 10.pubsub.topics-config-file: The configuration file name when using multiple topics.
Mutually exclusive withpubsub.request-subscriber-id,pubsub.request-path-urlandpubsub.inference-objectiveflags.
The configuration file when using the pubsub.topics-config-file flag should have the following format:
[
{
"igw_base_url": "http://localhost:30800",
"subscriber_id": "some_subscriber_id",
"inference_objective": "some_inference_objective",
"request_path_url": "e.g.: /v1/completions",
"gate_type": "constant",
"gate_params": {}
},
{
"subscriber_id": "another_subscriber",
"inference_objective": "batch_task",
"request_path_url": "/v1/inference",
"gate_type": "prometheus-saturation",
"gate_params": {
"pool": "pool_2",
"threshold": "0.75"
}
}
]Configuration Fields:
subscriber_id: The GCP PubSub subscriber ID for this topic.inference_objective: The inference objective header value.request_path_url: The request path URL.gate_type: Required type of dispatch gate for this topic.gate_params(optional): Parameters for the gate type (e.g., pool name, threshold for prometheus gates).
A setup based on a KIND cluster with a Redis server for MQ is provided. In order to deploy everything run:
make deploy-ap-emulated-on-kindThen, in a new terminal window register a subscriber:
kubectl exec -n redis redis-master-0 -- redis-cli SUBSCRIBE result-queuePublish a message for async processing:
kubectl exec -n redis redis-master-0 -- redis-cli PUBLISH request-queue '{"id" : "testmsg", "payload":{ "model":"unsloth/Meta-Llama-3.1-8B", "prompt":"hi"}, "deadline" :"9999999999" }'

