Skip to content
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6aac7ae
config and limiter extension
jmacd Mar 20, 2025
2a1818d
config and middleware extension
jmacd Mar 20, 2025
b519182
add limiter skeleton
jmacd Mar 20, 2025
4434dac
add request limiter calls
jmacd Mar 21, 2025
25a385c
At the two locations where I have written @@@, I realize that I have …
jmacd Mar 21, 2025
8fe3941
split rate limiter, resource limiter
jmacd Mar 21, 2025
84838fc
todos
jmacd Mar 21, 2025
cdc6698
skeleton ratelimiterextension
jmacd Mar 21, 2025
88b203b
rate limiter outline
jmacd Mar 22, 2025
b602b18
HTTP client and server network bytes
jmacd Mar 24, 2025
cec3306
add limiter provider
jmacd Mar 24, 2025
060c756
Follow 17002
jmacd Mar 24, 2025
ee8dd47
Let rate limiter accept request resource-weighted limits
jmacd Mar 24, 2025
9369495
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Mar 28, 2025
af51711
middleware config: more direct, extension naming more consistent
jmacd Mar 28, 2025
2c3a406
sketch grpc middleware
jmacd Mar 28, 2025
8ca49f0
extend confighttp and configgrpc
jmacd Mar 28, 2025
1fa405e
memory limiter
jmacd Mar 28, 2025
e125898
multi limiter provider
jmacd Mar 28, 2025
a6e6b17
OTLP receiver limiter
jmacd Mar 28, 2025
10cfbb3
add direct grpc limiter support (this could be a helper)
jmacd Mar 28, 2025
1f4a724
add direct http limiter support (this could be a helper)
jmacd Mar 28, 2025
ee0c2e0
merge
jmacd Apr 1, 2025
6d194a8
factor limiterhelper
jmacd Apr 1, 2025
9f66533
remove/rename
jmacd Apr 1, 2025
0e30a3a
Use non-nil ReleaseFunc everywhere
jmacd Apr 2, 2025
70296f0
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 14, 2025
a79da5f
Merge branch 'main' of github.com:jmacd/opentelemetry-collector into …
jmacd Apr 14, 2025
854108f
update from main
jmacd Apr 21, 2025
fb5b8b9
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 21, 2025
fc7ba55
hide the option and config, eliminate the limiterhelper.Consumer
jmacd Apr 21, 2025
ea8152a
wip
jmacd Apr 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configmiddleware"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
Expand Down Expand Up @@ -103,6 +104,9 @@ type ClientConfig struct {

// Auth configuration for outgoing RPCs.
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// Middlewares for the gRPC client.
Middlewares []configmiddleware.Middleware `mapstructure:"middlewares,omitempty"`
}

// NewDefaultClientConfig returns a new instance of ClientConfig with default values.
Expand Down Expand Up @@ -189,6 +193,9 @@ type ServerConfig struct {

// Include propagates the incoming connection's metadata to downstream consumers.
IncludeMetadata bool `mapstructure:"include_metadata,omitempty"`

// Middlewares for the gRPC server.
Middlewares []configmiddleware.Middleware `mapstructure:"middlewares,omitempty"`
}

// NewDefaultServerConfig returns a new instance of ServerConfig with default values.
Expand Down Expand Up @@ -362,6 +369,15 @@ func (gcs *ClientConfig) getGrpcDialOptions(
)
}

// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
for _, middleware := range gcs.Middlewares {
middlewareOptions, err := middleware.GetGRPCClientOptions(ctx, host.GetExtensions())
if err != nil {
return nil, fmt.Errorf("failed to get gRPC client options from middleware: %w", err)
}
opts = append(opts, middlewareOptions...)
}

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcDialOptionWrapper); ok {
opts = append(opts, wrapper.opt)
Expand Down Expand Up @@ -404,19 +420,20 @@ func (grpcServerOptionWrapper) isToServerOption() {}

// ToServer returns a [grpc.Server] for the configuration.
func (gss *ServerConfig) ToServer(
_ context.Context,
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts ...ToServerOption,
) (*grpc.Server, error) {
grpcOpts, err := gss.getGrpcServerOptions(host, settings, extraOpts)
grpcOpts, err := gss.getGrpcServerOptions(ctx, host, settings, extraOpts)
if err != nil {
return nil, err
}
return grpc.NewServer(grpcOpts...), nil
}

func (gss *ServerConfig) getGrpcServerOptions(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts []ToServerOption,
Expand Down Expand Up @@ -505,6 +522,15 @@ func (gss *ServerConfig) getGrpcServerOptions(

opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
for _, middleware := range gss.Middlewares {
middlewareOptions, err := middleware.GetGRPCServerOptions(ctx, host.GetExtensions())
if err != nil {
return nil, fmt.Errorf("failed to get gRPC server options from middleware: %w", err)
}
opts = append(opts, middlewareOptions...)
}

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcServerOptionWrapper); ok {
opts = append(opts, wrapper.opt)
Expand Down
5 changes: 3 additions & 2 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestDefaultGrpcServerSettings(t *testing.T) {
Endpoint: "0.0.0.0:1234",
},
}
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
opts, err := gss.getGrpcServerOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 3)
}
Expand All @@ -312,6 +312,7 @@ func TestGrpcServerExtraOption(t *testing.T) {
}
extraOpt := grpc.ConnectionTimeout(1_000_000_000)
opts, err := gss.getGrpcServerOptions(
context.Background(),
componenttest.NewNopHost(),
componenttest.NewNopTelemetrySettings(),
[]ToServerOption{WithGrpcServerOption(extraOpt)},
Expand Down Expand Up @@ -401,7 +402,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
},
},
}
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
opts, err := gss.getGrpcServerOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 10)
}
Expand Down
6 changes: 6 additions & 0 deletions config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
go.opentelemetry.io/collector/component/componenttest v0.122.1
go.opentelemetry.io/collector/config/configauth v0.122.1
go.opentelemetry.io/collector/config/configcompression v1.28.1
go.opentelemetry.io/collector/config/configmiddleware v0.0.0-00010101000000-000000000000
go.opentelemetry.io/collector/config/confignet v1.28.1
go.opentelemetry.io/collector/config/configopaque v1.28.1
go.opentelemetry.io/collector/config/configtls v1.28.1
Expand Down Expand Up @@ -39,6 +40,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/extension/extensionmiddleware v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/collector/featuregate v1.28.1 // indirect
go.opentelemetry.io/collector/internal/telemetry v0.122.1 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.122.1 // indirect
Expand Down Expand Up @@ -93,3 +95,7 @@ replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telem
replace go.opentelemetry.io/collector/pipeline => ../../pipeline

replace go.opentelemetry.io/collector/featuregate => ../../featuregate

replace go.opentelemetry.io/collector/config/configmiddleware => ../configmiddleware

replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware
44 changes: 43 additions & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confighttp/internal"
"go.opentelemetry.io/collector/config/configmiddleware"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/extension/extensionauth"
Expand Down Expand Up @@ -111,6 +112,11 @@ type ClientConfig struct {
HTTP2PingTimeout time.Duration `mapstructure:"http2_ping_timeout,omitempty"`
// Cookies configures the cookie management of the HTTP client.
Cookies *CookiesConfig `mapstructure:"cookies,omitempty"`

// Middlewares are used to add custom functionality to the HTTP client.
// Middleware handlers are applied in the order they appear in this list,
// with the first middleware becoming the outermost handler.
Middlewares []configmiddleware.Middleware `mapstructure:"middleware,omitempty"`
}

// CookiesConfig defines the configuration of the HTTP client regarding cookies served by the server.
Expand Down Expand Up @@ -194,6 +200,21 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett

clientTransport := (http.RoundTripper)(transport)

// Apply middlewares in reverse order so they are applied in
// order. The first middleware runs after authentication.
for i := len(hcs.Middlewares) - 1; i >= 0; i-- {
wrapper, err := hcs.Middlewares[i].GetHTTPClientRoundTripper(ctx, host.GetExtensions())
// If we failed to get the middleware
if err != nil {
return nil, err
}
clientTransport, err = wrapper(clientTransport)
// If we failed to construct a wrapper
if err != nil {
return nil, err
}
}

// The Auth RoundTripper should always be the innermost to ensure that
// request signing-based auth mechanisms operate after compression
// and header middleware modifies the request
Expand Down Expand Up @@ -338,6 +359,11 @@ type ServerConfig struct {
// is zero, the value of ReadTimeout is used. If both are
// zero, there is no timeout.
IdleTimeout time.Duration `mapstructure:"idle_timeout"`

// Middlewares are used to add custom functionality to the HTTP server.
// Middleware handlers are applied in the order they appear in this list,
// with the first middleware becoming the outermost handler.
Middlewares []configmiddleware.Middleware `mapstructure:"middleware,omitempty"`
}

// NewDefaultServerConfig returns ServerConfig type object with default values.
Expand Down Expand Up @@ -411,7 +437,7 @@ func WithDecoder(key string, dec func(body io.ReadCloser) (io.ReadCloser, error)
}

// ToServer creates an http.Server from settings object.
func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settings component.TelemetrySettings, handler http.Handler, opts ...ToServerOption) (*http.Server, error) {
func (hss *ServerConfig) ToServer(ctx context.Context, host component.Host, settings component.TelemetrySettings, handler http.Handler, opts ...ToServerOption) (*http.Server, error) {
serverOpts := &toServerOptions{}
serverOpts.Apply(opts...)

Expand All @@ -423,6 +449,22 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin
hss.CompressionAlgorithms = defaultCompressionAlgorithms
}

// Apply middlewares in reverse order so they are applied in
// order. The first middleware runs after decompression,
// below, preceded by Auth, CORS, etc.
for i := len(hss.Middlewares) - 1; i >= 0; i-- {
wrapper, err := hss.Middlewares[i].GetHTTPServerHandler(ctx, host.GetExtensions())
// If we failed to get the middleware
if err != nil {
return nil, err
}
handler, err = wrapper(handler)
// If we failed to construct a wrapper
if err != nil {
return nil, err
}
}

handler = httpContentDecompressor(
handler,
hss.MaxRequestBodySize,
Expand Down
6 changes: 6 additions & 0 deletions config/confighttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
go.opentelemetry.io/collector/component/componenttest v0.122.1
go.opentelemetry.io/collector/config/configauth v0.122.1
go.opentelemetry.io/collector/config/configcompression v1.28.1
go.opentelemetry.io/collector/config/configmiddleware v0.0.0-00010101000000-000000000000
go.opentelemetry.io/collector/config/configopaque v1.28.1
go.opentelemetry.io/collector/config/configtls v1.28.1
go.opentelemetry.io/collector/extension v1.28.1
Expand All @@ -36,6 +37,7 @@ require (
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/extension/extensionmiddleware v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/collector/featuregate v1.28.1 // indirect
go.opentelemetry.io/collector/internal/telemetry v0.122.1 // indirect
go.opentelemetry.io/collector/pdata v1.28.1 // indirect
Expand Down Expand Up @@ -83,3 +85,7 @@ replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telem
replace go.opentelemetry.io/collector/pipeline => ../../pipeline

replace go.opentelemetry.io/collector/featuregate => ../../featuregate

replace go.opentelemetry.io/collector/config/configmiddleware => ../configmiddleware

replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware
1 change: 1 addition & 0 deletions config/configlimiter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
42 changes: 42 additions & 0 deletions config/configlimiter/configlimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package configlimiter implements the configuration settings to
// apply rate limiting on incoming requests, and allows
// components to configure rate limiting behavior.
package configlimiter // import "go.opentelemetry.io/collector/config/configlimiter"

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/extensionlimiter"
)

var (
errLimiterNotFound = errors.New("limiter not found")
errNotLimiter = errors.New("requested extension is not a limiter")
)

// Limiter defines the rate limiting settings for a component.
type Limiter struct {
// LimiterID specifies the name of the extension to use in order to apply rate limiting.
LimiterID component.ID `mapstructure:"limiter,omitempty"`
}

// GetProvider attempts to select the appropriate extensionlimiter.Provider from the list of extensions,
// based on the requested extension name. If a limiter is not found, an error is returned.
// Callers will use the returned Provider to get access to the specific rate- and
// resource-limiter weights they are capable of limiting.
func (l Limiter) GetProvider(_ context.Context, extensions map[component.ID]component.Component) (extensionlimiter.Provider, error) {
if ext, found := extensions[l.LimiterID]; found {
if limiter, ok := ext.(extensionlimiter.Provider); ok {
return limiter, nil
}
return nil, errNotLimiter
}

return nil, fmt.Errorf("failed to resolve limiter provider %q: %w", l.LimiterID, errLimiterNotFound)
}
40 changes: 40 additions & 0 deletions config/configlimiter/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module go.opentelemetry.io/collector/config/configlimiter

go 1.23.0

require (
go.opentelemetry.io/collector/component v1.28.1
go.opentelemetry.io/collector/extension/extensionlimiter v0.0.0-00010101000000-000000000000
)

require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/featuregate v1.28.1 // indirect
go.opentelemetry.io/collector/internal/telemetry v0.122.1 // indirect
go.opentelemetry.io/collector/pdata v1.28.1 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/log v0.11.0 // indirect
go.opentelemetry.io/otel/metric v1.35.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.37.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/grpc v1.71.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
)

replace go.opentelemetry.io/collector/component => ../../component

replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry

replace go.opentelemetry.io/collector/extension/extensionlimiter => ../../extension/extensionlimiter
Loading
Loading