Skip to content

Commit 66f9e06

Browse files
authored
feat: add gcs_uri multipart RequestTransform plugin (#256) (#260)
* feat: add gcs_uri multipart RequestTransform plugin First concrete consumer of the request body-transform framework (#256, building on #257). Multi-modal endpoints (Whisper transcription, OCR) expect multipart/form-data with a `url` field pointing at a signed object URL, not the OpenAI-style JSON the pipeline produces. Producers can't put raw media on the broker, so the queued payload carries a signed URL in a `gcs_uri` field; this plugin rewrites the body at dispatch time. The gcs_uri_multipart plugin (pkg/asyncworker/transform/gcsmultipart): - Activates only when metadata.provider matches a configured provider AND the payload has a non-empty gcs_uri; otherwise the default JSON path is preserved unchanged. - Transform: writes gcs_uri as a `url` form field (a plain field, not a file upload), passes remaining fields through, drops gcs_uri, and rejects a non-empty file_base64 as fatal/non-retryable. - Validate: parses the signed URL expiry (V4 X-Goog-Date/X-Goog-Expires or V2 Expires) and fails fatally before dispatch if it expires at or before the request deadline, so the broker doesn't retry a doomed request. Expiry parsing is best-effort and needs no new dependency. Registered via init()/MustRegister and a blank import in cmd/main.go. Configured under requestTransforms with a `providers` parameter. Adds a README section documenting the flag, config shape, and plugin behavior. Signed-off-by: Shimi Bandiel <shimib@google.com> * feat(chart): wire request body-transform config Address review feedback on #260: the chart had no way to configure the new body-transform plugins. Add ap.transformConfig — rendered to transform-config.json in the config ConfigMap and passed via --transform-config-file, mirroring how worker-pools.json and pubsub-topics.json are mounted. Empty (the default) leaves behavior unchanged. Document the Helm path in the README and add helm-unittest coverage for the set and empty cases. Signed-off-by: Shimi Bandiel <shimib@google.com> --------- Signed-off-by: Shimi Bandiel <shimib@google.com>
1 parent e868e1d commit 66f9e06

8 files changed

Lines changed: 623 additions & 5 deletions

File tree

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ The architecture adheres to the following core principles:
3838
- [Per-Queue Dispatch Gates](#per-queue-dispatch-gates)
3939
- [Request Messages and Consumption](#request-messages-and-consumption)
4040
- [Request Merge Policy](#request-merge-policy)
41+
- [Request Body Transforms](#request-body-transforms)
42+
- [`gcs_uri_multipart` plugin](#gcs_uri_multipart-plugin)
4143
- [Retries](#retries)
4244
- [Observability](#observability)
4345
- [OpenTelemetry Tracing](#opentelemetry-tracing)
@@ -347,6 +349,46 @@ This per-pool topology provides complete backpressure and queue-level isolation:
347349

348350
Currently the only policy supported is `Random Robin Policy` which randomly picks messages from all queues configured for a given pool.
349351

352+
## Request Body Transforms
353+
354+
By default the worker dispatches the OpenAI-style JSON marshalled from a request's `payload`. Some providers need a different body shape at dispatch time — for example multi-modal endpoints (Whisper transcription, OCR) that expect `multipart/form-data` with a `url` field rather than JSON. Request body-transform plugins handle this without special-casing the worker: they rewrite the outgoing body and `Content-Type` based on per-message `metadata`, and the default JSON path is preserved byte-for-byte when no plugin applies.
355+
356+
Transforms are configured with `--transform-config-file`, pointing at a JSON object that groups plugins by direction:
357+
358+
```json
359+
{
360+
"requestTransforms": [
361+
{
362+
"name": "whisper-multipart",
363+
"type": "gcs_uri_multipart",
364+
"parameters": { "providers": ["whisper"] }
365+
}
366+
]
367+
}
368+
```
369+
370+
Each entry has a unique `name`, a registered plugin `type`, and opaque `parameters`. Unknown top-level fields are rejected. When the flag is empty, no transforms are loaded and behavior is unchanged.
371+
372+
With the Helm chart, set `ap.transformConfig` to this same object; the chart renders it to a config file and wires `--transform-config-file` automatically:
373+
374+
```yaml
375+
ap:
376+
transformConfig:
377+
requestTransforms:
378+
- name: "whisper-multipart"
379+
type: "gcs_uri_multipart"
380+
parameters:
381+
providers: ["whisper"]
382+
```
383+
384+
### `gcs_uri_multipart` plugin
385+
386+
Rewrites a JSON body into `multipart/form-data` for endpoints that take a signed object URL. Because producers can't put raw media bytes on the broker, the queued `payload` carries a signed URL (e.g. a GCS V4 signed URL) in a `gcs_uri` field.
387+
388+
- **Activation:** the message's `metadata.provider` must match one of the configured `providers`, and the `payload` must contain a non-empty `gcs_uri`. Otherwise the default JSON path is used unchanged.
389+
- **Transform:** writes the `gcs_uri` value as a `url` form field (a plain field, not a file upload), passes the remaining payload fields through as form fields, and drops `gcs_uri`. A non-empty `file_base64` is rejected as a fatal, non-retryable error (inline media is not supported on this path).
390+
- **Preflight:** parses the signed URL's expiry (V4 `X-Goog-Date` + `X-Goog-Expires`, or V2 `Expires`); if it expires at or before the message deadline, the request fails fatally before dispatch so the broker doesn't retry a request that cannot succeed.
391+
350392
## Retries
351393
352394
When a message processing has failed, either shedded or due to a server-side error, it will be scheduled for a retry (assuming the deadline has not passed).

charts/async-processor/templates/ap-configmap.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig }}
1+
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig .Values.ap.transformConfig }}
22
apiVersion: v1
33
kind: ConfigMap
44
metadata:
@@ -13,4 +13,7 @@ data:
1313
{{- if .Values.ap.gcpPubSub.topicsConfig }}
1414
pubsub-topics.json: {{ .Values.ap.gcpPubSub.topicsConfig | toJson | quote }}
1515
{{- end }}
16+
{{- if .Values.ap.transformConfig }}
17+
transform-config.json: {{ .Values.ap.transformConfig | toJson | quote }}
18+
{{- end }}
1619
{{- end }}

charts/async-processor/templates/ap-deployments.yaml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ spec:
9292
{{- if .Values.ap.workerPools }}
9393
- --pool-config-file=/etc/async-processor/config/worker-pools.json
9494
{{- end }}
95+
{{- if .Values.ap.transformConfig }}
96+
- --transform-config-file=/etc/async-processor/config/transform-config.json
97+
{{- end }}
9598
- --concurrency={{ .Values.ap.concurrency | default 8 }}
9699
- --drain-timeout={{ .Values.ap.drainTimeout | default "2m" }}
97100
- --health-port={{ .Values.ap.health.port | default 8081 }}
@@ -184,9 +187,9 @@ spec:
184187
resources:
185188
{{- toYaml . | nindent 12 }}
186189
{{- end }}
187-
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig .Values.ap.tls.secretName }}
190+
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig .Values.ap.transformConfig .Values.ap.tls.secretName }}
188191
volumeMounts:
189-
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig }}
192+
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig .Values.ap.transformConfig }}
190193
- name: ap-config
191194
mountPath: /etc/async-processor/config
192195
readOnly: true
@@ -199,9 +202,9 @@ spec:
199202
{{- end }}
200203
serviceAccountName: {{ include "async-processor.fullname" . }}
201204
terminationGracePeriodSeconds: 130
202-
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig .Values.ap.tls.secretName }}
205+
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig .Values.ap.transformConfig .Values.ap.tls.secretName }}
203206
volumes:
204-
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig }}
207+
{{- if or .Values.ap.workerPools .Values.ap.gcpPubSub.topicsConfig .Values.ap.transformConfig }}
205208
- name: ap-config
206209
configMap:
207210
name: {{ include "async-processor.fullname" . }}-config

charts/async-processor/tests/deployment_test.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,3 +308,28 @@ tests:
308308
- contains:
309309
path: spec.template.spec.containers[0].args
310310
content: --message-queue-impl=gcp-pubsub-gated
311+
312+
- it: should pass transform-config-file and mount the config when transformConfig is set
313+
set:
314+
ap.transformConfig:
315+
requestTransforms:
316+
- name: "gcs-whisper"
317+
type: "gcs_uri_multipart"
318+
parameters:
319+
providers: ["whisper"]
320+
asserts:
321+
- contains:
322+
path: spec.template.spec.containers[0].args
323+
content: --transform-config-file=/etc/async-processor/config/transform-config.json
324+
- contains:
325+
path: spec.template.spec.containers[0].volumeMounts
326+
content:
327+
name: ap-config
328+
mountPath: /etc/async-processor/config
329+
readOnly: true
330+
331+
- it: should not pass transform-config-file when transformConfig is empty
332+
asserts:
333+
- notContains:
334+
path: spec.template.spec.containers[0].args
335+
content: --transform-config-file=/etc/async-processor/config/transform-config.json

charts/async-processor/values.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,17 @@ ap:
2929
# - id: "premium"
3030
# workers: 16
3131
workerPools: []
32+
# Request body-transform plugins. When set, rendered to a config file and passed
33+
# via --transform-config-file. Object with a requestTransforms array; each entry
34+
# has a unique name, a registered plugin type, and opaque parameters. Empty
35+
# disables transforms (default JSON dispatch is unchanged).
36+
# transformConfig:
37+
# requestTransforms:
38+
# - name: "gcs-whisper"
39+
# type: "gcs_uri_multipart"
40+
# parameters:
41+
# providers: ["whisper"]
42+
transformConfig: {}
3243
otel:
3344
# OTLP gRPC endpoint for trace collection. Examples:
3445
# Dev (Jaeger all-in-one): "http://jaeger.default.svc.cluster.local:4317"

cmd/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"github.com/llm-d-incubation/llm-d-async/pkg/async/inference/flowcontrol"
2121
"github.com/llm-d-incubation/llm-d-async/pkg/asyncworker"
2222
"github.com/llm-d-incubation/llm-d-async/pkg/asyncworker/transform"
23+
// Register the built-in request body-transform plugins.
24+
_ "github.com/llm-d-incubation/llm-d-async/pkg/asyncworker/transform/gcsmultipart"
2325
"github.com/llm-d-incubation/llm-d-async/pkg/metrics"
2426
"github.com/llm-d-incubation/llm-d-async/pkg/plugins"
2527
"github.com/llm-d-incubation/llm-d-async/pkg/pubsub"

0 commit comments

Comments
 (0)