Skip to content

Commit d0fb27f

Browse files
committed
[processor/tailsampling] Support extensions
Allow the Tail Sampling Processor to use extensions that implement an interface simply containing NewEvaluator. To use an extension all that needs to be done is specify the extension's type/name such as my_extension or my_extension/my_name in the type field of the policy configuration. When configuration is loaded/updated the extension will be called in order to create a new evaluator. Extensions can be configured in two places. First, in the extensions section like all other extensions. This is useful for configuration such as if an extension needs to connect to an external service. In addition, an object can defined under the "extension" key the policy config for anything related to one instance. This is useful when reusing the same extension e.g. different params for different services.
1 parent d605fc9 commit d0fb27f

File tree

10 files changed

+146
-42
lines changed

10 files changed

+146
-42
lines changed

processor/tailsamplingprocessor/and_helper.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import (
1010
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
1111
)
1212

13-
func getNewAndPolicy(settings component.TelemetrySettings, config *AndCfg) (samplingpolicy.Evaluator, error) {
13+
func getNewAndPolicy(settings component.TelemetrySettings, config *AndCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
1414
subPolicyEvaluators := make([]samplingpolicy.Evaluator, len(config.SubPolicyCfg))
1515
for i := range config.SubPolicyCfg {
1616
policyCfg := &config.SubPolicyCfg[i]
17-
policy, err := getAndSubPolicyEvaluator(settings, policyCfg)
17+
policy, err := getAndSubPolicyEvaluator(settings, policyCfg, policyExtensions)
1818
if err != nil {
1919
return nil, err
2020
}
@@ -24,6 +24,6 @@ func getNewAndPolicy(settings component.TelemetrySettings, config *AndCfg) (samp
2424
}
2525

2626
// Return instance of and sub-policy
27-
func getAndSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg) (samplingpolicy.Evaluator, error) {
28-
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
27+
func getAndSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
28+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg, policyExtensions)
2929
}

processor/tailsamplingprocessor/and_helper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestAndHelper(t *testing.T) {
2727
},
2828
},
2929
},
30-
})
30+
}, nil)
3131
require.NoError(t, err)
3232

3333
expected := sampling.NewAnd(zap.NewNop(), []samplingpolicy.Evaluator{
@@ -46,7 +46,7 @@ func TestAndHelper(t *testing.T) {
4646
},
4747
},
4848
},
49-
})
49+
}, nil)
5050
require.EqualError(t, err, "unknown sampling policy type and")
5151
})
5252
}

processor/tailsamplingprocessor/composite_helper.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ import (
1111
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
1212
)
1313

14-
func getNewCompositePolicy(settings component.TelemetrySettings, config *CompositeCfg) (samplingpolicy.Evaluator, error) {
14+
func getNewCompositePolicy(settings component.TelemetrySettings, config *CompositeCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
1515
subPolicyEvalParams := make([]sampling.SubPolicyEvalParams, len(config.SubPolicyCfg))
1616
rateAllocationsMap := getRateAllocationMap(config)
1717
for i := range config.SubPolicyCfg {
1818
policyCfg := &config.SubPolicyCfg[i]
19-
policy, err := getCompositeSubPolicyEvaluator(settings, policyCfg)
19+
policy, err := getCompositeSubPolicyEvaluator(settings, policyCfg, policyExtensions)
2020
if err != nil {
2121
return nil, err
2222
}
@@ -48,11 +48,11 @@ func getRateAllocationMap(config *CompositeCfg) map[string]float64 {
4848
}
4949

5050
// Return instance of composite sub-policy
51-
func getCompositeSubPolicyEvaluator(settings component.TelemetrySettings, cfg *CompositeSubPolicyCfg) (samplingpolicy.Evaluator, error) {
51+
func getCompositeSubPolicyEvaluator(settings component.TelemetrySettings, cfg *CompositeSubPolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
5252
switch cfg.Type {
5353
case And:
54-
return getNewAndPolicy(settings, &cfg.AndCfg)
54+
return getNewAndPolicy(settings, &cfg.AndCfg, policyExtensions)
5555
default:
56-
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
56+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg, policyExtensions)
5757
}
5858
}

processor/tailsamplingprocessor/composite_helper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestCompositeHelper(t *testing.T) {
4545
Percent: 0, // will be populated with default
4646
},
4747
},
48-
})
48+
}, nil)
4949
require.NoError(t, err)
5050

5151
expected := sampling.NewComposite(zap.NewNop(), 1000, []sampling.SubPolicyEvalParams{
@@ -73,7 +73,7 @@ func TestCompositeHelper(t *testing.T) {
7373
},
7474
},
7575
},
76-
})
76+
}, nil)
7777
require.EqualError(t, err, "unknown sampling policy type composite")
7878
})
7979
}

processor/tailsamplingprocessor/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ type sharedPolicyCfg struct {
7474
BooleanAttributeCfg BooleanAttributeCfg `mapstructure:"boolean_attribute"`
7575
// Configs for OTTL condition filter sampling policy evaluator
7676
OTTLConditionCfg OTTLConditionCfg `mapstructure:"ottl_condition"`
77+
// Configs for any extensions that are used.
78+
ExtensionCfg map[string]any `mapstructure:"extension"`
7779
}
7880

7981
// CompositeSubPolicyCfg holds the common configuration to all policies under composite policy.

processor/tailsamplingprocessor/drop_helper.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import (
1010
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
1111
)
1212

13-
func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg) (samplingpolicy.Evaluator, error) {
13+
func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
1414
subPolicyEvaluators := make([]samplingpolicy.Evaluator, len(config.SubPolicyCfg))
1515
for i := range config.SubPolicyCfg {
1616
policyCfg := &config.SubPolicyCfg[i]
17-
policy, err := getDropSubPolicyEvaluator(settings, policyCfg)
17+
policy, err := getDropSubPolicyEvaluator(settings, policyCfg, policyExtensions)
1818
if err != nil {
1919
return nil, err
2020
}
@@ -24,6 +24,6 @@ func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg) (sa
2424
}
2525

2626
// Return instance of and sub-policy
27-
func getDropSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg) (samplingpolicy.Evaluator, error) {
28-
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
27+
func getDropSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
28+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg, policyExtensions)
2929
}

processor/tailsamplingprocessor/drop_helper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestDropHelper(t *testing.T) {
2727
},
2828
},
2929
},
30-
})
30+
}, nil)
3131
require.NoError(t, err)
3232

3333
expected := sampling.NewDrop(zap.NewNop(), []samplingpolicy.Evaluator{
@@ -46,7 +46,7 @@ func TestDropHelper(t *testing.T) {
4646
},
4747
},
4848
},
49-
})
49+
}, nil)
5050
require.EqualError(t, err, "unknown sampling policy type drop")
5151
})
5252
}

processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,7 @@ type Evaluator interface {
6161
// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
6262
Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error)
6363
}
64+
65+
type Extension interface {
66+
NewEvaluator(cfg map[string]any) (Evaluator, error)
67+
}

processor/tailsamplingprocessor/processor.go

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ type tailSamplingSpanProcessor struct {
6868
setPolicyMux sync.Mutex
6969
pendingPolicy []PolicyCfg
7070
sampleOnFirstMatch bool
71+
72+
host component.Host
7173
}
7274

7375
type traceLimiter interface {
@@ -156,10 +158,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume
156158
}
157159

158160
if tsp.policies == nil {
159-
err := tsp.loadSamplingPolicy(cfg.PolicyCfgs)
160-
if err != nil {
161-
return nil, err
162-
}
161+
tsp.pendingPolicy = cfg.PolicyCfgs
163162
}
164163

165164
if tsp.decisionBatcher == nil {
@@ -217,22 +216,31 @@ func withRecordPolicy() Option {
217216
}
218217
}
219218

220-
func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (samplingpolicy.Evaluator, error) {
219+
func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
221220
switch cfg.Type {
222221
case Composite:
223-
return getNewCompositePolicy(settings, &cfg.CompositeCfg)
222+
return getNewCompositePolicy(settings, &cfg.CompositeCfg, policyExtensions)
224223
case And:
225-
return getNewAndPolicy(settings, &cfg.AndCfg)
224+
return getNewAndPolicy(settings, &cfg.AndCfg, policyExtensions)
226225
case Drop:
227-
return getNewDropPolicy(settings, &cfg.DropCfg)
226+
return getNewDropPolicy(settings, &cfg.DropCfg, policyExtensions)
228227
default:
229-
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
228+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg, policyExtensions)
230229
}
231230
}
232231

233-
func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedPolicyCfg) (samplingpolicy.Evaluator, error) {
232+
func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedPolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
234233
settings.Logger = settings.Logger.With(zap.Any("policy", cfg.Type))
235234

235+
extension, ok := policyExtensions[string(cfg.Type)]
236+
if ok {
237+
evaluator, err := extension.NewEvaluator(cfg.ExtensionCfg)
238+
if err != nil {
239+
return nil, fmt.Errorf("unable to load extension %s: %w", string(cfg.Type), err)
240+
}
241+
return evaluator, nil
242+
}
243+
236244
switch cfg.Type {
237245
case AlwaysSample:
238246
return sampling.NewAlwaysSample(settings), nil
@@ -325,7 +333,7 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error
325333
}
326334
policyNames[cfg.Name] = struct{}{}
327335

328-
eval, err := getPolicyEvaluator(telemetrySettings, &cfg)
336+
eval, err := getPolicyEvaluator(telemetrySettings, &cfg, tsp.extensions())
329337
if err != nil {
330338
return fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err)
331339
}
@@ -364,14 +372,14 @@ func (tsp *tailSamplingSpanProcessor) SetSamplingPolicy(cfgs []PolicyCfg) {
364372
tsp.pendingPolicy = cfgs
365373
}
366374

367-
func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
375+
func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() error {
368376
tsp.setPolicyMux.Lock()
369377
defer tsp.setPolicyMux.Unlock()
370378

371379
// Nothing pending, do nothing.
372380
pLen := len(tsp.pendingPolicy)
373381
if pLen == 0 {
374-
return
382+
return nil
375383
}
376384

377385
tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen))
@@ -381,17 +389,17 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
381389
// Empty pending regardless of error. If policy is invalid, it will fail on
382390
// every tick, no need to do extra work and flood the log with errors.
383391
tsp.pendingPolicy = nil
384-
385-
if err != nil {
386-
tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err))
387-
tsp.logger.Debug("Continuing to use the previously loaded sampling policy")
388-
}
392+
return err
389393
}
390394

391395
func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
392396
tsp.logger.Debug("Sampling Policy Evaluation ticked")
393397

394-
tsp.loadPendingSamplingPolicy()
398+
err := tsp.loadPendingSamplingPolicy()
399+
if err != nil {
400+
tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err))
401+
tsp.logger.Debug("Continuing to use the previously loaded sampling policy")
402+
}
395403

396404
ctx := context.Background()
397405
metrics := newPolicyMetrics(len(tsp.policies))
@@ -644,11 +652,35 @@ func (*tailSamplingSpanProcessor) Capabilities() consumer.Capabilities {
644652
}
645653

646654
// Start is invoked during service startup.
647-
func (tsp *tailSamplingSpanProcessor) Start(context.Context, component.Host) error {
655+
func (tsp *tailSamplingSpanProcessor) Start(ctx context.Context, host component.Host) error {
656+
// We need to store the host before loading sampling policies in order to load any extensions.
657+
tsp.host = host
658+
if tsp.policies == nil {
659+
err := tsp.loadPendingSamplingPolicy()
660+
if err != nil {
661+
tsp.logger.Error("Failed to load initial sampling policy", zap.Error(err))
662+
return err
663+
}
664+
}
648665
tsp.policyTicker.Start(tsp.tickerFrequency)
649666
return nil
650667
}
651668

669+
func (tsp *tailSamplingSpanProcessor) extensions() map[string]samplingpolicy.Extension {
670+
if tsp.host == nil {
671+
return nil
672+
}
673+
extensions := tsp.host.GetExtensions()
674+
scoped := map[string]samplingpolicy.Extension{}
675+
for id, ext := range extensions {
676+
evaluator, ok := ext.(samplingpolicy.Extension)
677+
if ok {
678+
scoped[id.String()] = evaluator
679+
}
680+
}
681+
return scoped
682+
}
683+
652684
// Shutdown is invoked during service shutdown.
653685
func (tsp *tailSamplingSpanProcessor) Shutdown(context.Context) error {
654686
tsp.decisionBatcher.Stop()

0 commit comments

Comments
 (0)