Skip to content
13 changes: 2 additions & 11 deletions bpf/common/large_buffers.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,8 @@ volatile const u32 mssql_max_captured_bytes = 0;
volatile const u32 tcp_max_captured_bytes = 0;

enum {
// Maximum payload size per ring buffer chunk.
Comment thread
NameHaibinZhang marked this conversation as resolved.
k_large_buf_payload_max_size = 1 << 14, // 16K

// Scratch memory size for a large buffer event: sizeof(tcp_large_buffer_t) + payload.
// Rounded up to the next power of 2 above k_large_buf_payload_max_size to account
// for the struct overhead.
k_large_buf_max_size = 1 << 15, // 32K

// Maximum valid value for each protocol's *_max_captured_bytes volatile variable.
// These must equal the lte= validation values in EBPFBufferSizes (pkg/config/ebpf_tracer.go),
// which enforces the same ceiling at configuration time.
k_large_buf_payload_max_size = 1 << 14,
k_large_buf_max_size = 1 << 15,
k_large_buf_max_http_captured_bytes = 1 << 16,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is gated to 64KB, which means

static __always_inline int http_send_large_buffer(http_info_t *req,
                                                  const void *u_buf,
                                                  u32 bytes_len,
                                                  u8 packet_type,
                                                  u8 direction,
                                                  enum large_buf_action action) {
    if (http_max_captured_bytes > k_large_buf_max_http_captured_bytes) {
        bpf_dbg_printk("BUG: http_max_captured_bytes exceeds maximum allowed value.");
    }

will fail if the value is higher than that?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question — it won't cause a functional failure. Here's the layered design:

  • http_max_captured_bytes (set from userspace, up to 256KB) is the total per-request per-direction budget. It controls when to stop accumulating data (bytes_sent >= http_max_captured_bytes).
  • k_large_buf_max_http_captured_bytes (64KB) is the per-syscall-event cap, used in bpf_clamp_umax(max_available_bytes, k_large_buf_max_http_captured_bytes) to bound each ring buffer submission. This is critical for the BPF verifier to prove memory safety.
  • The 256KB total budget is reached by accumulating multiple 64KB chunks across successive tcp_recvmsg events.

The bpf_dbg_printk("BUG: ...") check is a debug-only assertion (gated by g_bpf_debug, compiled out in production) that predates the HTTP limit raise. It's now stale for HTTP — I can clean it up or update the condition to reflect the new semantic. It does not affect correctness since the code continues past it regardless and the actual safety bound is enforced by bpf_clamp_umax.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there was a return statement. I think we should clean it up if it's pointless

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — removed the stale assertion. It was comparing the per-request budget (256KB) against the per-syscall cap (64KB), which is now expected behavior in the multi-chunk design.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right approach. IMHO the statement is correct: it is a bug to set a value that is lager than k_large_buf_max_http_captured_bytes because k_large_buf_max_http_captured_bytes is used to bound max_available_bytes for the verifier. So if max_available_bytes is larger than k_large_buf_max_http_captured_bytes, we end up truncating the buffer and throwing away the remainder of the payload silently.

So IMHO we should at least log that, or try to remove the clamp and see if the verifier is happy.

@NameHaibinZhang NameHaibinZhang Jun 18, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rafaelroquetto You're absolutely right — this was a real bug. If a single tcp_recvmsg delivers more than 64KB, BPF truncates the chunk, and blindly appending subsequent truncated chunks would create holes in the reassembled buffer.

Fixed: the userspace reassembly now detects truncation by checking whether the accumulated data for a single emission hits exactly the per-syscall cap (64KB) with a full final chunk (16KB payload). When that pattern is detected, the buffer is "sealed" and subsequent chunks for that direction are discarded — ensuring the assembled data is always a contiguous prefix (tail truncation only, no holes).

Added unit tests covering: truncation detection, non-truncation pass-through, and seal reset on new request.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NameHaibinZhang thanks for iterating this but I don't think this is the right approach and I'd rather we don't go down the userspace-heuristic path at all.

The userspace simply has no way of knowing whether a buffer was actually truncated. A syscall that got clamped at 64KB and a perfectly healthy one that just happens to land on a 64KB boundary produce exactly the same sequence of chunks, i.e same lengths, same actions, nothing to tell them apart. So len % 64KB == 0 isn't detecting truncation, it's guessing, and it'll guess wrong on legitimate traffic, silently capping perfectly healthy buffers at 64KB, which is the exact regression we're trying to avoid.

The truncation only exists as a fact inside BPF - that's the one place that knows bytes_len > cap so that's where it has to be dealt with.

I do think we can do this properly in eBPF with some effort. The verifier makes it fiddly, I know, and that's probably why we kept everything bounded at 64KB in the first place, but I'd much rather we take the time to respect the semantics and get it right than merge a workaround that quietly regresses the common case.

@NameHaibinZhang NameHaibinZhang Jun 18, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rafaelroquetto You're right — the userspace heuristic can't reliably distinguish truncation from legitimate 64KB-aligned traffic. I'll revert the heuristic and implement this properly in BPF: add a truncated flag to the event metadata set when bytes_len > cap, so userspace gets an explicit signal to stop accumulation. Will push the BPF-side fix shortly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NameHaibinZhang: no, I don't mean it like that - a truncated flag is a workaround. We should be able to ship full 256KB from ebpf, the usual semantics. This will require some ebpf work. I'd suggest first trying to simply increasing the constant and see what the verifier says. In the likely event it complains, you will need to chunk the buffers. I'd rather us take some time to get it right than rush it and get it wrong, there's no urgency here.

@NameHaibinZhang NameHaibinZhang Jun 19, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rafaelroquetto I've already tested raising k_large_buf_max_http_captured_bytes to 256KB directly — it fails the BPF verifier on 5.10 kernels. The loop unrolling in large_buf_emit_chunks goes from 4 iterations (4×16KB = 64KB) to 16 iterations (16×16KB = 256KB), which exceeds the instruction count limit on older kernels.

So the path forward would be a multi-emission approach (e.g. tail calls or multiple ring buffer submissions across separate BPF program invocations) to stay within verifier bounds while delivering the full 256KB. I'll work on that — it'll take a bit more time to get right.

k_large_buf_max_mysql_captured_bytes = 1 << 16,
k_large_buf_max_postgres_captured_bytes = 1 << 16,
Expand Down
4 changes: 0 additions & 4 deletions bpf/generictracer/protocol_http.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ static __always_inline int http_send_large_buffer(http_info_t *req,
u8 packet_type,
u8 direction,
enum large_buf_action action) {
if (http_max_captured_bytes > k_large_buf_max_http_captured_bytes) {
bpf_dbg_printk("BUG: http_max_captured_bytes exceeds maximum allowed value.");
}

const u32 bytes_sent =
packet_type == PACKET_TYPE_REQUEST ? req->lb_req_bytes : req->lb_res_bytes;

Expand Down
2 changes: 1 addition & 1 deletion devdocs/config/CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ EBPFTracer configuration for eBPF programs

### `ebpf.buffer_sizes`

Per-protocol maximum bytes to capture per request per direction, sent to userspace via large buffer events. Values must stay aligned with MaxCapturedPayloadBytes and the k_large_buf_max_*_captured_bytes constants in bpf/common/large_buffers.h. Default: 0 (disabled).
Per-protocol maximum bytes to capture per request per direction, sent to userspace via large buffer events. Default: 0 (disabled).

| YAML Path | Type | Env Var | Default | Values | Deprecated | Description |
|---|---|---|---|---|---|---|
Expand Down
2 changes: 1 addition & 1 deletion devdocs/config/config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@
}
},
"type": "object",
"description": "Per-protocol maximum bytes to capture per request per direction, sent to userspace via large buffer events. Values must stay aligned with MaxCapturedPayloadBytes and the k_large_buf_max_*_captured_bytes constants in bpf/common/large_buffers.h. Default: 0 (disabled)."
"description": "Per-protocol maximum bytes to capture per request per direction, sent to userspace via large buffer events. Default: 0 (disabled)."
},
"EBPFTracer": {
"properties": {
Expand Down
2 changes: 1 addition & 1 deletion devdocs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Large payloads are streamed to userspace across multiple ring-buffer events and

| Environment variable | Protocol | Maximum | Default |
|:-----------------------------------|:----------:|--------:|:------------:|
| `OTEL_EBPF_BPF_BUFFER_SIZE_HTTP` | HTTP | 65535 | 0 (disabled) |
| `OTEL_EBPF_BPF_BUFFER_SIZE_HTTP` | HTTP | 262144 | 0 (disabled) |
| `OTEL_EBPF_BPF_BUFFER_SIZE_MYSQL` | MySQL | 65535 | 0 (disabled) |
| `OTEL_EBPF_BPF_BUFFER_SIZE_KAFKA` | Kafka | 65535 | 0 (disabled) |
| `OTEL_EBPF_BPF_BUFFER_SIZE_POSTGRES` | PostgreSQL | 65535 | 0 (disabled) |
Expand Down
11 changes: 1 addition & 10 deletions pkg/config/ebpf_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,11 @@ func (e *EBPFTracer) CudaInstrumentationEnabled() bool {
return false
}

// MaxCapturedPayloadBytes is the maximum number of bytes that can be captured
// per protocol request direction via large buffer events.
//
// It must stay aligned with the k_large_buf_max_*_captured_bytes constants in
// bpf/common/large_buffers.h and with the validate tags in EBPFBufferSizes.
const MaxCapturedPayloadBytes = 1 << 16

// Per-protocol maximum bytes to capture per request per direction, sent to userspace via large buffer events.
// Values must stay aligned with MaxCapturedPayloadBytes and the
// k_large_buf_max_*_captured_bytes constants in bpf/common/large_buffers.h.
//
// Default: 0 (disabled).
type EBPFBufferSizes struct {
HTTP uint32 `yaml:"http" env:"OTEL_EBPF_BPF_BUFFER_SIZE_HTTP" validate:"lte=65536"`
HTTP uint32 `yaml:"http" env:"OTEL_EBPF_BPF_BUFFER_SIZE_HTTP" validate:"lte=262144"`
MySQL uint32 `yaml:"mysql" env:"OTEL_EBPF_BPF_BUFFER_SIZE_MYSQL" validate:"lte=65536"`
Kafka uint32 `yaml:"kafka" env:"OTEL_EBPF_BPF_BUFFER_SIZE_KAFKA" validate:"lte=65536"`
Postgres uint32 `yaml:"postgres" env:"OTEL_EBPF_BPF_BUFFER_SIZE_POSTGRES" validate:"lte=65536"`
Expand Down
13 changes: 9 additions & 4 deletions pkg/config/ebpf_tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package config

import (
"fmt"
"reflect"
"testing"
)
Expand Down Expand Up @@ -256,17 +255,23 @@ func TestEBPFTracer_CudaInstrumentationEnabled(t *testing.T) {
}

func TestEBPFBufferSizesValidateTagsMatchMaxCapturedPayloadBytes(t *testing.T) {
expected := fmt.Sprintf("lte=%d", MaxCapturedPayloadBytes)
typ := reflect.TypeOf(EBPFBufferSizes{})

for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
if got := field.Tag.Get("validate"); got != expected {
got := field.Tag.Get("validate")
var expected string
if field.Name == "HTTP" {
expected = "lte=262144"
} else {
expected = "lte=65536"
}
if got != expected {
t.Fatalf(
"EBPFBufferSizes.%s validate tag drifted: got %q, want %q.\n"+
"To resolve this, update all of the following together:\n"+
"1. %s validate tag in pkg/config/ebpf_tracer.go\n"+
"2. MaxCapturedPayloadBytes in pkg/config/ebpf_tracer.go\n"+
"2. HTTP validate tag upper bound (HTTP only)\n"+
"3. matching k_large_buf_max_*_captured_bytes constant in bpf/common/large_buffers.h",
field.Name,
got,
Expand Down
3 changes: 3 additions & 0 deletions pkg/ebpf/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ type EBPFParseContext struct {
redisDBCache *simplelru.LRU[BpfConnectionInfoT, int]
couchbaseBucketCache *simplelru.LRU[BpfConnectionInfoT, CouchbaseBucketInfo]
largeBuffers *expirable.LRU[largeBufferKey, *largebuf.LargeBuffer]
sealedLargeBuffers *expirable.LRU[largeBufferKey, struct{}]
mongoRequestCache PendingMongoDBRequests
mysqlPreparedStatements *simplelru.LRU[mysqlPreparedStatementsKey, string]
postgresPreparedStatements *simplelru.LRU[postgresPreparedStatementsKey, string]
Expand Down Expand Up @@ -293,6 +294,7 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request.

h2c, _ := lru.New[uint64, h2Connection](1024 * 10)
largeBuffers := expirable.NewLRU[largeBufferKey, *largebuf.LargeBuffer](1024, nil, 5*time.Minute)
sealedLargeBuffers := expirable.NewLRU[largeBufferKey, struct{}](1024, nil, 5*time.Minute)

if spansChan != nil {
emitSpans = func(spans []request.Span) {
Expand Down Expand Up @@ -366,6 +368,7 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request.
redisDBCache: redisDBCache,
couchbaseBucketCache: couchbaseBucketCache,
largeBuffers: largeBuffers,
sealedLargeBuffers: sealedLargeBuffers,
mongoRequestCache: mongoRequestCache,
mysqlPreparedStatements: mysqlPreparedStatements,
postgresPreparedStatements: postgresPreparedStatements,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ebpf/common/http/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func AnthropicSpan(baseSpan *request.Span, req *http.Request, resp *http.Respons

var parsedResponse request.AnthropicResponse
var toolCalls []request.ToolCall
if len(respB) > 0 && respB[0] == '{' {
if looksLikeJSON(respB) {
parsedResponse = parseAnthropicResponse(respB)
toolCalls = extractAnthropicToolCalls(parsedResponse.Content)
} else {
Expand Down
19 changes: 16 additions & 3 deletions pkg/ebpf/common/http/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package ebpfcommon // import "go.opentelemetry.io/obi/pkg/ebpf/common/http"

import (
"bytes"
"encoding/json"
"log/slog"
"net/http"
Expand Down Expand Up @@ -50,6 +51,18 @@ func extractToolCalls(choices json.RawMessage) []request.ToolCall {
return result
}

// parseOpenAICompatibleResponse parses an OpenAI-compatible response body,
// handling both JSON and SSE streaming formats. It returns the parsed response
// and any tool calls extracted from the response.
func parseOpenAICompatibleResponse(respB []byte) (*request.VendorOpenAI, []request.ToolCall) {
if looksLikeJSON(respB) {
resp := parseVendorOpenAI(respB)
return &resp, extractToolCalls(resp.Choices)
}
reader := bytes.NewReader(respB)
return parseOpenAIStream(reader)
}

func OpenAISpan(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) {
// Check any of the well known response headers that OpenAI would use
isOpenAI := false
Expand Down Expand Up @@ -77,7 +90,7 @@ func OpenAISpan(baseSpan *request.Span, req *http.Request, resp *http.Response)
slog.Debug("OpenAI", "request", string(reqB), "response", string(respB))

parsedRequest := parseOpenAIInput(reqB)
parsedResponse := parseVendorOpenAI(respB)
parsedResponse, toolCalls := parseOpenAICompatibleResponse(respB)

if parsedResponse.ResponseModel == "" {
parsedResponse.ResponseModel = parsedRequest.Model
Expand All @@ -87,7 +100,7 @@ func OpenAISpan(baseSpan *request.Span, req *http.Request, resp *http.Response)
}

parsedResponse.Request = parsedRequest
parsedResponse.ToolCalls = extractToolCalls(parsedResponse.Choices)
parsedResponse.ToolCalls = toolCalls

// Override operation name and derive API type from URL path.
if req.URL != nil {
Expand All @@ -106,7 +119,7 @@ func OpenAISpan(baseSpan *request.Span, req *http.Request, resp *http.Response)

baseSpan.SubType = request.HTTPSubtypeOpenAI
baseSpan.GenAI = &request.GenAI{
OpenAI: &parsedResponse,
OpenAI: parsedResponse,
}

return *baseSpan, true
Expand Down
193 changes: 193 additions & 0 deletions pkg/ebpf/common/http/openai_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ebpfcommon // import "go.opentelemetry.io/obi/pkg/ebpf/common/http"

import (
"bufio"
"encoding/json"
"io"
"log/slog"
"strings"

"go.opentelemetry.io/obi/pkg/appolly/app/request"
)

// maxStreamToolCalls caps the tool-call accumulator to prevent unbounded
// growth from untrusted tool_calls[].index values.
const maxStreamToolCalls = 256

type openAIStreamChunk struct {
ID string `json:"id"`
Object string `json:"object"`
Model string `json:"model"`
Choices []struct {
Index int `json:"index"`
Delta struct {
Role string `json:"role"`
Content string `json:"content"`
ToolCalls []openAIStreamToolCall `json:"tool_calls"`
} `json:"delta"`
FinishReason *string `json:"finish_reason"`
} `json:"choices"`
Usage *struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
} `json:"usage"`
}

type openAIStreamToolCall struct {
Index int `json:"index"`
ID string `json:"id"`
Type string `json:"type"`
Function struct {
Name string `json:"name"`
Arguments string `json:"arguments"`
} `json:"function"`
}

// parseOpenAIStream parses the SSE stream from OpenAI-compatible APIs (including Qwen/DashScope)
// and returns the aggregated response with usage statistics and tool calls.
func parseOpenAIStream(reader io.Reader) (*request.VendorOpenAI, []request.ToolCall) {
scanner := bufio.NewScanner(reader)
Comment thread
NameHaibinZhang marked this conversation as resolved.
Comment thread
NameHaibinZhang marked this conversation as resolved.
scanner.Buffer(make([]byte, 0, 256*1024), 256*1024)
response := &request.VendorOpenAI{}

var finishReason string
var role string
var contentBuilder strings.Builder
// toolCallAccum accumulates tool call fragments by index.
type toolCallAccum struct {
id string
name string
}
var accumulators []toolCallAccum

for scanner.Scan() {
line := scanner.Text()

if !strings.HasPrefix(line, "data: ") {
continue
}

data := strings.TrimPrefix(line, "data: ")

if data == "[DONE]" {
break
}

var chunk openAIStreamChunk
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
continue
}

// Extract model and id from the first chunk that has them.
if response.ID == "" && chunk.ID != "" {
response.ID = chunk.ID
}
if response.ResponseModel == "" && chunk.Model != "" {
response.ResponseModel = chunk.Model
}

// Extract usage from the chunk that contains it (typically the last one).
if chunk.Usage != nil {
response.Usage.PromptTokens = chunk.Usage.PromptTokens
response.Usage.CompletionTokens = chunk.Usage.CompletionTokens
response.Usage.TotalTokens = chunk.Usage.TotalTokens
response.Usage.InputTokens = chunk.Usage.InputTokens
response.Usage.OutputTokens = chunk.Usage.OutputTokens
}

// Process choices.
for i := range chunk.Choices {
choice := &chunk.Choices[i]

// Track finish reason from the last choice that reports one.
if choice.FinishReason != nil && *choice.FinishReason != "" {
finishReason = *choice.FinishReason
}

// Capture assistant role (typically in the first delta) and
// accumulate content fragments to reconstruct the full message.
if choice.Delta.Role != "" {
role = choice.Delta.Role
}
if choice.Delta.Content != "" {
contentBuilder.WriteString(choice.Delta.Content)
}

// Accumulate tool calls by index.
for j := range choice.Delta.ToolCalls {
tc := &choice.Delta.ToolCalls[j]
idx := tc.Index
if idx < 0 || idx >= maxStreamToolCalls {
continue
}

// Grow the accumulator slice as needed.
for len(accumulators) <= idx {
accumulators = append(accumulators, toolCallAccum{})
}

if tc.ID != "" {
accumulators[idx].id = tc.ID
}
if tc.Function.Name != "" {
accumulators[idx].name = tc.Function.Name
}

}
}
}

if err := scanner.Err(); err != nil {
slog.Debug("parseOpenAIStream: scanner error", "error", err)
}

// Build the Choices JSON with the aggregated message content and
// finish_reason so that VendorOpenAI.GetFinishReasons() and the GenAI
// output normalization (normalizeOpenAIChoices) work correctly.
if finishReason != "" || contentBuilder.Len() > 0 {
type streamChoice struct {
Message struct {
Role string `json:"role"`
Content string `json:"content"`
} `json:"message"`
FinishReason string `json:"finish_reason"`
}

sc := streamChoice{FinishReason: finishReason}
sc.Message.Role = role
if sc.Message.Role == "" {
sc.Message.Role = "assistant"
}
sc.Message.Content = contentBuilder.String()

choicesJSON, err := json.Marshal([]streamChoice{sc})
if err == nil {
response.Choices = choicesJSON
}
}

// Build the final tool calls list.
var toolCalls []request.ToolCall
for i := range accumulators {
if accumulators[i].name == "" {
continue
}
toolCalls = append(toolCalls, request.ToolCall{
Comment thread
NameHaibinZhang marked this conversation as resolved.
ID: accumulators[i].id,
Name: accumulators[i].name,
})
}

if response.Usage.GetInputTokens() == 0 && response.Usage.GetOutputTokens() == 0 && response.ID != "" {
slog.Debug("parseOpenAIStream: no usage data found in SSE stream, token counts will be 0",
"id", response.ID, "model", response.ResponseModel, "finishReason", finishReason)
}

return response, toolCalls
}
Loading
Loading