Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Support `SLICE` attributes in `go.opentelemetry.io/otel/exporters/zipkin`. (#8216)
- Apply `AttributeValueLengthLimit` to `attribute.SLICE` type attribute values in `go.opentelemetry.io/otel/sdk/trace`, recursively truncating contained string values. (#8217)
- Add `Error` field on `Record` type in `go.opentelemetry.io/otel/log/logtest`. (#8148)
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#8157)
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#8157)
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#8157)
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8157)
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#8157)
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#8157)
- Add experimental support for splitting metric data across multiple batches in `go.opentelemetry.io/otel/sdk/metric`.
Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=<max_size>` to enable for all periodic readers.
See `go.opentelemetry.io/otel/sdk/metric/internal/x` for feature documentation. (#8071)
Expand All @@ -50,6 +56,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Fixed

- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`.
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`.
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`.
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`.
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`.
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`.
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
- Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8135)
- Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#8152)
- `go.opentelemetry.io/otel/exporters/prometheus` now uses `Value.String` formatting for label values following the [OpenTelemetry AnyValue representation for non-OTLP protocols](https://opentelemetry.io/docs/specs/otel/common/#anyvalue). (#8170)
Expand Down
25 changes: 16 additions & 9 deletions exporters/otlp/otlplog/otlploggrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

Expand All @@ -20,6 +21,7 @@ import (
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
Expand All @@ -28,9 +30,10 @@ import (

// The methods of this type are not expected to be called concurrently.
type client struct {
metadata metadata.MD
exportTimeout time.Duration
requestFunc retry.RequestFunc
metadata metadata.MD
exportTimeout time.Duration
maxRequestSize int
requestFunc retry.RequestFunc

// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
Expand All @@ -49,9 +52,10 @@ var newGRPCClientFn = grpc.NewClient
// newClient creates a new gRPC log client.
func newClient(cfg config) (*client, error) {
c := &client{
exportTimeout: cfg.timeout.Value,
requestFunc: cfg.retryCfg.Value.RequestFunc(retryable),
conn: cfg.gRPCConn.Value,
exportTimeout: cfg.timeout.Value,
maxRequestSize: cfg.maxRequestSize.Value,
requestFunc: cfg.retryCfg.Value.RequestFunc(retryable),
conn: cfg.gRPCConn.Value,
}

if len(cfg.headers.Value) > 0 {
Expand Down Expand Up @@ -146,6 +150,7 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
ctx, cancel := c.exportContext(ctx)
defer cancel()

pbRequest := &collogpb.ExportLogsServiceRequest{ResourceLogs: rl}
count := int64(len(rl))
if c.instrumentation != nil {
eo := c.instrumentation.ExportLogs(ctx, count)
Expand All @@ -154,10 +159,12 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
}()
}

if maxSize := c.maxRequestSize; maxSize > 0 && proto.Size(pbRequest) > maxSize {
return fmt.Errorf("request message too large: exceeded %d bytes", maxSize)
Comment thread
dashpole marked this conversation as resolved.
}

return errors.Join(uploadErr, c.requestFunc(ctx, func(ctx context.Context) error {
resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{
ResourceLogs: rl,
})
resp, err := c.lsc.Export(ctx, pbRequest)
if resp != nil && resp.PartialSuccess != nil {
msg := resp.PartialSuccess.GetErrorMessage()
n := resp.PartialSuccess.GetRejectedLogRecords()
Expand Down
30 changes: 30 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ var (
}}
)

type logsServiceClientFunc func(context.Context, *collogpb.ExportLogsServiceRequest, ...grpc.CallOption) (*collogpb.ExportLogsServiceResponse, error)

func (f logsServiceClientFunc) Export(
ctx context.Context,
req *collogpb.ExportLogsServiceRequest,
opts ...grpc.CallOption,
) (*collogpb.ExportLogsServiceResponse, error) {
return f(ctx, req, opts...)
}

func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
Expand Down Expand Up @@ -267,6 +277,26 @@ func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
assert.Equal(t, delay, d)
}

func TestUploadLogsRequestSizeLimit(t *testing.T) {
var calls int
c := &client{
maxRequestSize: 1,
requestFunc: func(ctx context.Context, fn func(context.Context) error) error {
return fn(ctx)
},
lsc: logsServiceClientFunc(
func(context.Context, *collogpb.ExportLogsServiceRequest, ...grpc.CallOption) (*collogpb.ExportLogsServiceResponse, error) {
calls++
return &collogpb.ExportLogsServiceResponse{}, nil
},
),
}

err := c.UploadLogs(t.Context(), []*lpb.ResourceLogs{{}})
assert.ErrorContains(t, err, "request message too large")
assert.Equal(t, 0, calls, "oversized request must fail before sending")
}

func TestNewClient(t *testing.T) {
newGRPCClientFnSwap := newGRPCClientFn
t.Cleanup(func() {
Expand Down
38 changes: 28 additions & 10 deletions exporters/otlp/otlplog/otlploggrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (

// Default values.
var (
defaultEndpoint = "localhost:4317"
defaultTimeout = 10 * time.Second
defaultRetryCfg = retry.DefaultConfig
defaultEndpoint = "localhost:4317"
defaultTimeout = 10 * time.Second
defaultMaxRequestSize = 32 * 1024 * 1024
Comment thread
pellared marked this conversation as resolved.
defaultRetryCfg = retry.DefaultConfig
)

// Environment variable keys.
Expand Down Expand Up @@ -85,13 +86,14 @@ type Option interface {
}

type config struct {
endpoint setting[string]
insecure setting[bool]
tlsCfg setting[*tls.Config]
headers setting[map[string]string]
compression setting[Compression]
timeout setting[time.Duration]
retryCfg setting[retry.Config]
endpoint setting[string]
insecure setting[bool]
tlsCfg setting[*tls.Config]
headers setting[map[string]string]
compression setting[Compression]
maxRequestSize setting[int]
timeout setting[time.Duration]
retryCfg setting[retry.Config]

// gRPC configurations
gRPCCredentials setting[credentials.TransportCredentials]
Expand Down Expand Up @@ -129,6 +131,9 @@ func newConfig(options []Option) config {
getEnv[time.Duration](envTimeout, convDuration),
fallback[time.Duration](defaultTimeout),
)
c.maxRequestSize = c.maxRequestSize.Resolve(
fallback[int](defaultMaxRequestSize),
)
c.retryCfg = c.retryCfg.Resolve(
fallback[retry.Config](defaultRetryCfg),
)
Expand Down Expand Up @@ -353,6 +358,19 @@ func WithTimeout(duration time.Duration) Option {
})
}

// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export
// request, before compression, that the exporter will send.
//
// If size is less than or equal to zero, no request-size limit is applied.
// Disabling the limit is not recommended because it can lead to excessive
// resource consumption or abuse.
func WithMaxRequestSize(size int) Option {
return fnOpt(func(c config) config {
c.maxRequestSize = newSetting(size)
return c
})
}

// WithRetry sets the retry policy for transient retryable errors that are
// returned by the target endpoint.
//
Expand Down
5 changes: 5 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestNewConfig(t *testing.T) {
WithServiceConfig("{}"),
WithDialOption(dialOptions...),
WithGRPCConn(&grpc.ClientConn{}),
WithMaxRequestSize(1),
WithTimeout(2 * time.Second),
WithRetry(RetryConfig(rc)),
},
Expand All @@ -120,6 +121,7 @@ func TestNewConfig(t *testing.T) {
insecure: newSetting(true),
headers: newSetting(headers),
compression: newSetting(GzipCompression),
maxRequestSize: newSetting(1),
timeout: newSetting(2 * time.Second),
retryCfg: newSetting(rc),
gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)),
Expand Down Expand Up @@ -494,6 +496,9 @@ func TestNewConfig(t *testing.T) {
return func() { otel.SetErrorHandler(orig) }
}(otel.GetErrorHandler()))
c := newConfig(tc.options)
if !tc.want.maxRequestSize.Set {
tc.want.maxRequestSize = newSetting(defaultMaxRequestSize)
}

// Do not compare pointer values.
assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg)
Expand Down
30 changes: 18 additions & 12 deletions exporters/otlp/otlplog/otlploghttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ func newHTTPClient(ctx context.Context, cfg config) (*client, error) {
req.Header.Set("Content-Type", "application/x-protobuf")

c := &httpClient{
compression: cfg.compression.Value,
req: req,
requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate),
client: hc,
compression: cfg.compression.Value,
maxRequestSize: cfg.maxRequestSize.Value,
req: req,
requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate),
client: hc,
}

id := nextExporterID()
Expand All @@ -125,10 +126,11 @@ func newHTTPClient(ctx context.Context, cfg config) (*client, error) {

type httpClient struct {
// req is cloned for every upload the client makes.
req *http.Request
compression Compression
requestFunc retry.RequestFunc
client *http.Client
req *http.Request
compression Compression
maxRequestSize int
requestFunc retry.RequestFunc
client *http.Client

inst *observ.Instrumentation
}
Expand Down Expand Up @@ -159,17 +161,21 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
if err != nil {
return err
}
request, err := c.newRequest(ctx, body)
if err != nil {
return err
}

var statusCode int
if c.inst != nil {
op := c.inst.ExportLogs(ctx, int64(len(data)))
defer func() { op.End(uploadErr, statusCode) }()
}

if maxSize := c.maxRequestSize; maxSize > 0 && len(body) > maxSize {
return fmt.Errorf("request body too large: exceeded %d bytes", maxSize)
}
request, err := c.newRequest(ctx, body)
if err != nil {
return err
}

return errors.Join(uploadErr, c.requestFunc(ctx, func(iCtx context.Context) error {
select {
case <-iCtx.Done():
Expand Down
23 changes: 23 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,29 @@ func TestResponseBodySizeLimit(t *testing.T) {
}
}

func TestRequestBodySizeLimit(t *testing.T) {
var calls int
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
calls++
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(srv.Close)

opts := []Option{
WithEndpoint(srv.Listener.Addr().String()),
WithInsecure(),
WithMaxRequestSize(1),
WithRetry(RetryConfig{Enabled: false}),
}
cfg := newConfig(opts)
c, err := newHTTPClient(t.Context(), cfg)
require.NoError(t, err)

err = c.UploadLogs(t.Context(), []*lpb.ResourceLogs{{}})
assert.ErrorContains(t, err, "request body too large")
assert.Equal(t, 0, calls, "oversized request must fail before sending")
}

func BenchmarkExporterExportLogs(b *testing.B) {
const n = 10

Expand Down
48 changes: 33 additions & 15 deletions exporters/otlp/otlplog/otlploghttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (

// Default values.
var (
defaultEndpoint = "localhost:4318"
defaultPath = "/v1/logs"
defaultTimeout = 10 * time.Second
defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment
defaultRetryCfg = retry.DefaultConfig
defaultEndpoint = "localhost:4318"
defaultPath = "/v1/logs"
defaultTimeout = 10 * time.Second
defaultMaxRequestSize = 32 * 1024 * 1024
defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment
defaultRetryCfg = retry.DefaultConfig
)

// Environment variable keys.
Expand Down Expand Up @@ -89,16 +90,17 @@ type fnOpt func(config) config
func (f fnOpt) applyHTTPOption(c config) config { return f(c) }

type config struct {
endpoint setting[string]
path setting[string]
insecure setting[bool]
tlsCfg setting[*tls.Config]
headers setting[map[string]string]
compression setting[Compression]
timeout setting[time.Duration]
proxy setting[HTTPTransportProxyFunc]
retryCfg setting[retry.Config]
httpClient *http.Client
endpoint setting[string]
path setting[string]
insecure setting[bool]
tlsCfg setting[*tls.Config]
headers setting[map[string]string]
compression setting[Compression]
maxRequestSize setting[int]
timeout setting[time.Duration]
proxy setting[HTTPTransportProxyFunc]
retryCfg setting[retry.Config]
httpClient *http.Client
}

func newConfig(options []Option) config {
Expand Down Expand Up @@ -133,6 +135,9 @@ func newConfig(options []Option) config {
getenv[time.Duration](envTimeout, convDuration),
fallback[time.Duration](defaultTimeout),
)
c.maxRequestSize = c.maxRequestSize.Resolve(
fallback[int](defaultMaxRequestSize),
)
c.proxy = c.proxy.Resolve(
fallback[HTTPTransportProxyFunc](defaultProxy),
)
Expand Down Expand Up @@ -313,6 +318,19 @@ func WithTimeout(duration time.Duration) Option {
})
}

// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export
// request, before compression, that the exporter will send.
//
// If size is less than or equal to zero, no request-size limit is applied.
// Disabling the limit is not recommended because it can lead to excessive
// resource consumption or abuse.
func WithMaxRequestSize(size int) Option {
return fnOpt(func(c config) config {
c.maxRequestSize = newSetting(size)
return c
})
}

// RetryConfig defines configuration for retrying the export of log data that
// failed.
type RetryConfig retry.Config
Expand Down
Loading
Loading