Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions processor/ratelimitprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,18 @@ type Config struct {
// If not set, class resolution is disabled.
// Only applicable when the rate limiter type is "gubernator".
ClassResolver component.ID `mapstructure:"class_resolver"`

// GubernatorBehavior configures the behavior of rate limiter in Gubernator.
// Only applicable when the rate limiter type is "gubernator".
//
// Options are "batching" or "global". Defaults to "batching".
GubernatorBehavior GubernatorBehavior `mapstructure:"gubernator_behavior"`
}

// Unmarshal implements temporary logic to parse the older format of the overrides.
// This is achieved by identifying if overrides are defined using the old config
// and mapping it to the new config.
func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
func (config *Config) Unmarshal(componentParser *confmap.Conf) error {
if componentParser == nil {
return nil
}
Expand Down Expand Up @@ -103,7 +109,7 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
}
}

if err := componentParser.Unmarshal(cfg, confmap.WithIgnoreUnused()); err != nil {
if err := componentParser.Unmarshal(config, confmap.WithIgnoreUnused()); err != nil {
return err
}

Expand All @@ -113,7 +119,7 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
continue
}
matches := make(map[string][]string)
cfg.Overrides[i].Matches = matches
config.Overrides[i].Matches = matches
matchKVs := strings.Split(k, ";")
for _, matchKV := range matchKVs {
if len(matchKV) == 0 {
Expand Down Expand Up @@ -317,6 +323,11 @@ const (
// GubernatorBehavior controls Gubernator's behavior.
type GubernatorBehavior string

const (
GubernatorBehaviorBatching GubernatorBehavior = "batching"
GubernatorBehaviorGlobal GubernatorBehavior = "global"
)

func createDefaultConfig() component.Config {
return &Config{
Type: LocalRateLimiter,
Expand All @@ -330,8 +341,9 @@ func createDefaultConfig() component.Config {
DefaultWindowMultiplier: 1.3,
WindowDuration: 2 * time.Minute,
},
Classes: nil,
DefaultClass: "",
Classes: nil,
DefaultClass: "",
GubernatorBehavior: GubernatorBehaviorBatching,
}
}

Expand Down Expand Up @@ -538,3 +550,17 @@ func (t RateLimiterType) Validate() error {
},
)
}

func (b GubernatorBehavior) Validate() error {
switch b {
case GubernatorBehaviorBatching, GubernatorBehaviorGlobal:
return nil
}
return fmt.Errorf(
"invalid gubernator behavior %q, expected one of %q",
b, []string{
string(GubernatorBehaviorBatching),
string(GubernatorBehaviorGlobal),
},
)
}
31 changes: 31 additions & 0 deletions processor/ratelimitprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
},
},
{
Expand All @@ -77,6 +78,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
},
},
{
Expand All @@ -92,6 +94,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
},
},
{
Expand All @@ -108,6 +111,7 @@ func TestLoadConfig(t *testing.T) {
},
DynamicRateLimiting: defaultDynamicRateLimiting,
MetadataKeys: []string{"project_id"},
GubernatorBehavior: GubernatorBehaviorBatching,
},
},
{
Expand All @@ -123,6 +127,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
Overrides: []RateLimitOverrides{
{
Matches: map[string][]string{
Expand All @@ -147,6 +152,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
Overrides: []RateLimitOverrides{
{
Matches: map[string][]string{
Expand All @@ -170,6 +176,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
Overrides: []RateLimitOverrides{
{
Matches: map[string][]string{
Expand All @@ -193,6 +200,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
Overrides: []RateLimitOverrides{
{
Matches: map[string][]string{
Expand All @@ -217,6 +225,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
Overrides: []RateLimitOverrides{
{
Matches: map[string][]string{
Expand Down Expand Up @@ -244,6 +253,23 @@ func TestLoadConfig(t *testing.T) {
DefaultWindowMultiplier: 1.5,
WindowDuration: time.Minute,
},
GubernatorBehavior: GubernatorBehaviorBatching,
},
},
{
name: "gubernator_global",
expected: &Config{
Type: GubernatorRateLimiter,
RateLimitSettings: RateLimitSettings{
Rate: 100,
Burst: 200,
Strategy: StrategyRateLimitRequests,
ThrottleBehavior: ThrottleBehaviorError,
ThrottleInterval: 1 * time.Second,
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorGlobal,
},
},
{
Expand All @@ -266,6 +292,10 @@ func TestLoadConfig(t *testing.T) {
name: "invalid_type",
expectedErr: `invalid rate limiter type "invalid", expected one of ["local" "gubernator"]`,
},
{
name: "invalid_gubernator_behavior",
expectedErr: `invalid gubernator behavior "foo", expected one of ["batching" "global"]`,
},
{
name: "invalid_default_class",
expectedErr: `default_class "nonexistent" does not exist in classes`,
Expand Down Expand Up @@ -296,6 +326,7 @@ func TestLoadConfig(t *testing.T) {
RetryDelay: 1 * time.Second,
},
DynamicRateLimiting: defaultDynamicRateLimiting,
GubernatorBehavior: GubernatorBehaviorBatching,
Overrides: []RateLimitOverrides{
{
Matches: map[string][]string{
Expand Down
11 changes: 10 additions & 1 deletion processor/ratelimitprocessor/gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newGubernatorRateLimiter(cfg *Config, logger *zap.Logger, telemetryBuilder
return &gubernatorRateLimiter{
cfg: cfg,
logger: logger,
behavior: gubernator.Behavior_BATCHING,
behavior: guberBehavior(cfg.GubernatorBehavior),
daemonCfg: daemonCfg,
telemetryBuilder: telemetryBuilder,
tracerProvider: tracerProvider,
Expand All @@ -136,6 +136,15 @@ func newGubernatorRateLimiter(cfg *Config, logger *zap.Logger, telemetryBuilder
}, nil
}

func guberBehavior(b GubernatorBehavior) gubernator.Behavior {
switch b {
case GubernatorBehaviorGlobal:
return gubernator.Behavior_GLOBAL
default:
return gubernator.Behavior_BATCHING
}
}

func (r *gubernatorRateLimiter) Start(ctx context.Context, host component.Host) (err error) {
if res := r.cfg.ClassResolver; res.String() != "" {
cr, ok := host.GetExtensions()[res]
Expand Down
11 changes: 11 additions & 0 deletions processor/ratelimitprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,14 @@ ratelimit/deprecated_overrides:
project-id:e994532b-5n94-48et-a95c-1fa0638g6288;project-type:test:
rate: 3000
burst: 5000

ratelimit/invalid_gubernator_behavior:
rate: 1
burst: 1
gubernator_behavior: foo

ratelimit/gubernator_global:
rate: 100
burst: 200
type: gubernator
gubernator_behavior: global