Skip to content

Commit 1760221

Browse files
fix: request memory corruption on fallback (#1322)
**Description** Currently, there is subtle request memory corruption due to sjson ReplaceInPlace usage. sjson modifies the original buffer when ReplaceInPlace is set. This causes request memory corruption during sjson.SetBytesOption calls to replace string(eg: model overwrite). This issue is always consistently reproducible in fallback mode. It happens in other cases as well but not evident as original request is not reused. --------- Signed-off-by: Johnu George <[email protected]> Co-authored-by: Takeshi Yoneda <[email protected]>
1 parent bece94a commit 1760221

File tree

9 files changed

+247
-114
lines changed

9 files changed

+247
-114
lines changed

internal/extproc/chatcompletion_processor.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,12 @@ func (c *chatCompletionProcessorRouterFilter) ProcessRequestBody(ctx context.Con
118118
body.StreamOptions = &openai.StreamOptions{IncludeUsage: true}
119119
// Rewrite the original bytes to include the stream_options.include_usage=true so that forcing the request body
120120
// mutation, which uses this raw body, will also result in the stream_options.include_usage=true.
121-
rawBody.Body, err = sjson.SetBytesOptions(rawBody.Body, "stream_options.include_usage", true, translator.SJSONOptions)
121+
rawBody.Body, err = sjson.SetBytesOptions(rawBody.Body, "stream_options.include_usage", true, &sjson.Options{
122+
Optimistic: true,
123+
// Note: it is safe to do in-place replacement since this route level processor is executed once per request,
124+
// and the result can be safely shared among possible multiple retries.
125+
ReplaceInPlace: true,
126+
})
122127
if err != nil {
123128
return nil, fmt.Errorf("failed to set stream_options: %w", err)
124129
}

internal/extproc/completions_processor.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ func (c *completionsProcessorRouterFilter) ProcessRequestBody(_ context.Context,
108108
body.StreamOptions = &openai.StreamOptions{IncludeUsage: true}
109109
// Rewrite the original bytes to include the stream_options.include_usage=true so that forcing the request body
110110
// mutation, which uses this raw body, will also result in the stream_options.include_usage=true.
111-
rawBody.Body, err = sjson.SetBytesOptions(rawBody.Body, "stream_options.include_usage", true, translator.SJSONOptions)
111+
rawBody.Body, err = sjson.SetBytesOptions(rawBody.Body, "stream_options.include_usage", true, &sjson.Options{
112+
Optimistic: true,
113+
// Note: it is safe to do in-place replacement since this route level processor is executed once per request,
114+
// and the result can be safely shared among possible multiple retries.
115+
ReplaceInPlace: true,
116+
})
112117
if err != nil {
113118
return nil, fmt.Errorf("failed to set stream_options: %w", err)
114119
}

internal/extproc/translator/openai_completions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (o *openAIToOpenAITranslatorV1Completion) RequestBody(original []byte, req
5151
var newBody []byte
5252
if o.modelNameOverride != "" {
5353
// If modelName is set we override the model to be used for the request.
54-
newBody, err = sjson.SetBytesOptions(original, "model", o.modelNameOverride, SJSONOptions)
54+
newBody, err = sjson.SetBytesOptions(original, "model", o.modelNameOverride, sjsonOptions)
5555
if err != nil {
5656
return nil, nil, fmt.Errorf("failed to set model name: %w", err)
5757
}

internal/extproc/translator/openai_embeddings.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (o *openAIToOpenAITranslatorV1Embedding) RequestBody(original []byte, _ *op
4444
var newBody []byte
4545
if o.modelNameOverride != "" {
4646
// If modelName is set we override the model to be used for the request.
47-
newBody, err = sjson.SetBytesOptions(original, "model", o.modelNameOverride, SJSONOptions)
47+
newBody, err = sjson.SetBytesOptions(original, "model", o.modelNameOverride, sjsonOptions)
4848
if err != nil {
4949
return nil, nil, fmt.Errorf("failed to set model name: %w", err)
5050
}

internal/extproc/translator/openai_openai.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (o *openAIToOpenAITranslatorV1ChatCompletion) RequestBody(original []byte,
5656
var newBody []byte
5757
if o.modelNameOverride != "" {
5858
// If modelName is set we override the model to be used for the request.
59-
newBody, err = sjson.SetBytesOptions(original, "model", o.modelNameOverride, SJSONOptions)
59+
newBody, err = sjson.SetBytesOptions(original, "model", o.modelNameOverride, sjsonOptions)
6060
if err != nil {
6161
return nil, nil, fmt.Errorf("failed to set model name: %w", err)
6262
}

internal/extproc/translator/translator.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,10 @@ type LLMTokenUsage struct {
215215
TotalTokens uint32
216216
}
217217

218-
// SJSONOptions are the options used for sjson operations in the translator.
219-
// This is also used outside the package to share the same options for consistency.
220-
var SJSONOptions = &sjson.Options{
221-
Optimistic: true,
222-
ReplaceInPlace: true,
218+
// sjsonOptions are the options used for sjson operations in the translator.
219+
var sjsonOptions = &sjson.Options{
220+
Optimistic: true,
221+
// Note: DO NOT set ReplaceInPlace to true since at the translation layer, which might be called multiple times per retry,
222+
// it must be ensured that the original body is not modified, i.e. the operation must be idempotent.
223+
ReplaceInPlace: false,
223224
}

tests/extproc/envoy.yaml

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ static_resources:
3838
retry_on: "5xx,gateway-error,reset,rest-before-request,connect-failure,envoy-ratelimited,retriable-4xx,refused-stream,retriable-status-codes,retriable-headers"
3939
num_retries: 5
4040
per_try_timeout: "30s"
41+
retry_priority:
42+
name: envoy.retry_priorities.previous_priorities
43+
typedConfig:
44+
"@type": type.googleapis.com/envoy.extensions.retry.priority.previous_priorities.v3.PreviousPrioritiesConfig
45+
updateFrequency: 1
4146
retry_back_off:
4247
base_interval: "0.1s"
4348
max_interval: "1s"
@@ -209,6 +214,14 @@ static_resources:
209214
exact: gcp-anthropicai
210215
route:
211216
cluster: testupstream-gcp-anthropicai
217+
- match:
218+
prefix: "/"
219+
headers:
220+
- name: x-test-backend
221+
string_match:
222+
exact: modelname-override-and-fallback
223+
route:
224+
cluster: testupstream-modelname-override-and-fallback
212225
http_filters:
213226
- name: envoy.filters.http.ext_proc
214227
typed_config:
@@ -313,6 +326,76 @@ static_resources:
313326
filter_metadata:
314327
aigateway.envoy.io:
315328
per_route_rule_backend_name: "testupstream-openai"
329+
- name: testupstream-modelname-override-and-fallback
330+
connect_timeout: 0.25s
331+
type: STATIC
332+
lb_policy: ROUND_ROBIN
333+
outlier_detection:
334+
consecutive_5xx: 1
335+
interval: 1s
336+
base_ejection_time: 2s # Must be smaller than the require.Eventually's interval. Otherwise, the tests may pass without going through the fallback since the always-failing backend could be ejected by the time when require.Eventually retries due to the previous request IF the retry is not configured.
337+
max_ejection_percent: 100
338+
typed_extension_protocol_options:
339+
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
340+
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
341+
explicit_http_config:
342+
http_protocol_options: {}
343+
http_filters:
344+
- name: upstream_extproc
345+
typed_config:
346+
"@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor
347+
request_attributes:
348+
- xds.upstream_host_metadata
349+
processing_mode:
350+
request_header_mode: "SEND"
351+
request_body_mode: "NONE"
352+
response_header_mode: "SKIP"
353+
response_body_mode: "NONE"
354+
grpc_service:
355+
envoy_grpc:
356+
cluster_name: extproc_cluster
357+
metadataOptions:
358+
receivingNamespaces:
359+
untyped:
360+
- io.envoy.ai_gateway
361+
- name: envoy.filters.http.header_mutation
362+
typed_config:
363+
"@type": type.googleapis.com/envoy.extensions.filters.http.header_mutation.v3.HeaderMutation
364+
mutations:
365+
request_mutations:
366+
- append:
367+
append_action: ADD_IF_ABSENT
368+
header:
369+
key: content-length
370+
value: "%DYNAMIC_METADATA(io.envoy.ai_gateway:content_length)%"
371+
- name: envoy.filters.http.upstream_codec
372+
typed_config:
373+
"@type": type.googleapis.com/envoy.extensions.filters.http.upstream_codec.v3.UpstreamCodec
374+
load_assignment:
375+
cluster_name: testupstream-modelname-override-and-fallback
376+
endpoints:
377+
- lb_endpoints:
378+
- endpoint:
379+
address:
380+
socket_address:
381+
address: 127.0.0.1
382+
port_value: 8080
383+
metadata:
384+
filter_metadata:
385+
aigateway.envoy.io:
386+
per_route_rule_backend_name: "testupstream-openai-5xx"
387+
priority: 0 # Primary.
388+
- lb_endpoints:
389+
- endpoint:
390+
address:
391+
socket_address:
392+
address: 127.0.0.1
393+
port_value: 8080
394+
metadata:
395+
filter_metadata:
396+
aigateway.envoy.io:
397+
per_route_rule_backend_name: "testupstream-openai-always-200"
398+
priority: 1 # Secondary.
316399
- name: testupstream-modelname-override
317400
connect_timeout: 0.25s
318401
type: STATIC

0 commit comments

Comments
 (0)