Skip to content

Commit f77eb42

Browse files
authored
[processor/tailsampling] Support extensions (#42573)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Allow the Tail Sampling Processor to use extensions that implement an interface simply containing NewEvaluator. To use an extension all that needs to be done is specify the extension's type/name such as my_extension or my_extension/my_name in the type field of the policy configuration. When configuration is loaded or updated NewEvaluator will be called. Extensions can be configured in two places. First, in the extensions section like all other extensions. This is useful for configuration such as if an extension needs to connect to an external service. In addition, configuration can be defined in the policy config under a key matching the type of the extension for anything related to one instance, following the pattern for all other policy types. This is useful when reusing the same extension e.g. different params for different services. For example: ```yaml type: custom_policy custom_policy: foo: bar ``` What this ends up with is that extensions look exactly like all other policy types with no need for users to know about some sort of extension container. I have tested this out with an extension in a custom build and it is working well, I am sure we will iterate on the interfaces a bit as we write more. Specifically a way to stop the extension when policies are updated may be necessary to clean up any background processes. @yvrhdn is working on an extension we plan to add to contrib as well. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Part of #31582
1 parent 13f17b0 commit f77eb42

File tree

11 files changed

+198
-43
lines changed

11 files changed

+198
-43
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: tailsamplingprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for extensions that implement sampling policies.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31582]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Extension support for tailsamplingprocessor is still in development and the interfaces may change at any time.
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: []

processor/tailsamplingprocessor/and_helper.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import (
1010
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
1111
)
1212

13-
func getNewAndPolicy(settings component.TelemetrySettings, config *AndCfg) (samplingpolicy.Evaluator, error) {
13+
func getNewAndPolicy(settings component.TelemetrySettings, config *AndCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
1414
subPolicyEvaluators := make([]samplingpolicy.Evaluator, len(config.SubPolicyCfg))
1515
for i := range config.SubPolicyCfg {
1616
policyCfg := &config.SubPolicyCfg[i]
17-
policy, err := getAndSubPolicyEvaluator(settings, policyCfg)
17+
policy, err := getAndSubPolicyEvaluator(settings, policyCfg, policyExtensions)
1818
if err != nil {
1919
return nil, err
2020
}
@@ -24,6 +24,6 @@ func getNewAndPolicy(settings component.TelemetrySettings, config *AndCfg) (samp
2424
}
2525

2626
// Return instance of and sub-policy
27-
func getAndSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg) (samplingpolicy.Evaluator, error) {
28-
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
27+
func getAndSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
28+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg, policyExtensions)
2929
}

processor/tailsamplingprocessor/and_helper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestAndHelper(t *testing.T) {
2727
},
2828
},
2929
},
30-
})
30+
}, nil)
3131
require.NoError(t, err)
3232

3333
expected := sampling.NewAnd(zap.NewNop(), []samplingpolicy.Evaluator{
@@ -46,7 +46,7 @@ func TestAndHelper(t *testing.T) {
4646
},
4747
},
4848
},
49-
})
49+
}, nil)
5050
require.EqualError(t, err, "unknown sampling policy type and")
5151
})
5252
}

processor/tailsamplingprocessor/composite_helper.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ import (
1111
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
1212
)
1313

14-
func getNewCompositePolicy(settings component.TelemetrySettings, config *CompositeCfg) (samplingpolicy.Evaluator, error) {
14+
func getNewCompositePolicy(settings component.TelemetrySettings, config *CompositeCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
1515
subPolicyEvalParams := make([]sampling.SubPolicyEvalParams, len(config.SubPolicyCfg))
1616
rateAllocationsMap := getRateAllocationMap(config)
1717
for i := range config.SubPolicyCfg {
1818
policyCfg := &config.SubPolicyCfg[i]
19-
policy, err := getCompositeSubPolicyEvaluator(settings, policyCfg)
19+
policy, err := getCompositeSubPolicyEvaluator(settings, policyCfg, policyExtensions)
2020
if err != nil {
2121
return nil, err
2222
}
@@ -48,11 +48,11 @@ func getRateAllocationMap(config *CompositeCfg) map[string]float64 {
4848
}
4949

5050
// Return instance of composite sub-policy
51-
func getCompositeSubPolicyEvaluator(settings component.TelemetrySettings, cfg *CompositeSubPolicyCfg) (samplingpolicy.Evaluator, error) {
51+
func getCompositeSubPolicyEvaluator(settings component.TelemetrySettings, cfg *CompositeSubPolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
5252
switch cfg.Type {
5353
case And:
54-
return getNewAndPolicy(settings, &cfg.AndCfg)
54+
return getNewAndPolicy(settings, &cfg.AndCfg, policyExtensions)
5555
default:
56-
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
56+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg, policyExtensions)
5757
}
5858
}

processor/tailsamplingprocessor/composite_helper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestCompositeHelper(t *testing.T) {
4545
Percent: 0, // will be populated with default
4646
},
4747
},
48-
})
48+
}, nil)
4949
require.NoError(t, err)
5050

5151
expected := sampling.NewComposite(zap.NewNop(), 1000, []sampling.SubPolicyEvalParams{
@@ -73,7 +73,7 @@ func TestCompositeHelper(t *testing.T) {
7373
},
7474
},
7575
},
76-
})
76+
}, nil)
7777
require.EqualError(t, err, "unknown sampling policy type composite")
7878
})
7979
}

processor/tailsamplingprocessor/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ type sharedPolicyCfg struct {
7474
BooleanAttributeCfg BooleanAttributeCfg `mapstructure:"boolean_attribute"`
7575
// Configs for OTTL condition filter sampling policy evaluator
7676
OTTLConditionCfg OTTLConditionCfg `mapstructure:"ottl_condition"`
77+
// Configs for any extensions that are used.
78+
ExtensionCfg map[string]map[string]any `mapstructure:",remain"`
7779
}
7880

7981
// CompositeSubPolicyCfg holds the common configuration to all policies under composite policy.

processor/tailsamplingprocessor/drop_helper.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import (
1010
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/pkg/samplingpolicy"
1111
)
1212

13-
func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg) (samplingpolicy.Evaluator, error) {
13+
func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
1414
subPolicyEvaluators := make([]samplingpolicy.Evaluator, len(config.SubPolicyCfg))
1515
for i := range config.SubPolicyCfg {
1616
policyCfg := &config.SubPolicyCfg[i]
17-
policy, err := getDropSubPolicyEvaluator(settings, policyCfg)
17+
policy, err := getDropSubPolicyEvaluator(settings, policyCfg, policyExtensions)
1818
if err != nil {
1919
return nil, err
2020
}
@@ -24,6 +24,6 @@ func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg) (sa
2424
}
2525

2626
// Return instance of and sub-policy
27-
func getDropSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg) (samplingpolicy.Evaluator, error) {
28-
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
27+
func getDropSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
28+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg, policyExtensions)
2929
}

processor/tailsamplingprocessor/drop_helper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestDropHelper(t *testing.T) {
2727
},
2828
},
2929
},
30-
})
30+
}, nil)
3131
require.NoError(t, err)
3232

3333
expected := sampling.NewDrop(zap.NewNop(), []samplingpolicy.Evaluator{
@@ -46,7 +46,7 @@ func TestDropHelper(t *testing.T) {
4646
},
4747
},
4848
},
49-
})
49+
}, nil)
5050
require.EqualError(t, err, "unknown sampling policy type drop")
5151
})
5252
}

processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,7 @@ type Evaluator interface {
6161
// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
6262
Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error)
6363
}
64+
65+
type Extension interface {
66+
NewEvaluator(policyName string, cfg map[string]any) (Evaluator, error)
67+
}

processor/tailsamplingprocessor/processor.go

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ type tailSamplingSpanProcessor struct {
6868
setPolicyMux sync.Mutex
6969
pendingPolicy []PolicyCfg
7070
sampleOnFirstMatch bool
71+
72+
host component.Host
7173
}
7274

7375
type traceLimiter interface {
@@ -156,10 +158,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume
156158
}
157159

158160
if tsp.policies == nil {
159-
err := tsp.loadSamplingPolicy(cfg.PolicyCfgs)
160-
if err != nil {
161-
return nil, err
162-
}
161+
tsp.pendingPolicy = cfg.PolicyCfgs
163162
}
164163

165164
if tsp.decisionBatcher == nil {
@@ -217,20 +216,20 @@ func withRecordPolicy() Option {
217216
}
218217
}
219218

220-
func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (samplingpolicy.Evaluator, error) {
219+
func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
221220
switch cfg.Type {
222221
case Composite:
223-
return getNewCompositePolicy(settings, &cfg.CompositeCfg)
222+
return getNewCompositePolicy(settings, &cfg.CompositeCfg, policyExtensions)
224223
case And:
225-
return getNewAndPolicy(settings, &cfg.AndCfg)
224+
return getNewAndPolicy(settings, &cfg.AndCfg, policyExtensions)
226225
case Drop:
227-
return getNewDropPolicy(settings, &cfg.DropCfg)
226+
return getNewDropPolicy(settings, &cfg.DropCfg, policyExtensions)
228227
default:
229-
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
228+
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg, policyExtensions)
230229
}
231230
}
232231

233-
func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedPolicyCfg) (samplingpolicy.Evaluator, error) {
232+
func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedPolicyCfg, policyExtensions map[string]samplingpolicy.Extension) (samplingpolicy.Evaluator, error) {
234233
settings.Logger = settings.Logger.With(zap.Any("policy", cfg.Type))
235234

236235
switch cfg.Type {
@@ -273,8 +272,17 @@ func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedP
273272
case OTTLCondition:
274273
ottlfCfg := cfg.OTTLConditionCfg
275274
return sampling.NewOTTLConditionFilter(settings, ottlfCfg.SpanConditions, ottlfCfg.SpanEventConditions, ottlfCfg.ErrorMode)
276-
277275
default:
276+
t := string(cfg.Type)
277+
extension, ok := policyExtensions[t]
278+
if ok {
279+
evaluator, err := extension.NewEvaluator(cfg.Name, cfg.ExtensionCfg[t])
280+
if err != nil {
281+
return nil, fmt.Errorf("unable to load extension %s: %w", t, err)
282+
}
283+
return evaluator, nil
284+
}
285+
278286
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
279287
}
280288
}
@@ -326,7 +334,7 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error
326334
}
327335
policyNames[cfg.Name] = struct{}{}
328336

329-
eval, err := getPolicyEvaluator(telemetrySettings, &cfg)
337+
eval, err := getPolicyEvaluator(telemetrySettings, &cfg, tsp.extensions())
330338
if err != nil {
331339
return fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err)
332340
}
@@ -365,14 +373,14 @@ func (tsp *tailSamplingSpanProcessor) SetSamplingPolicy(cfgs []PolicyCfg) {
365373
tsp.pendingPolicy = cfgs
366374
}
367375

368-
func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
376+
func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() error {
369377
tsp.setPolicyMux.Lock()
370378
defer tsp.setPolicyMux.Unlock()
371379

372380
// Nothing pending, do nothing.
373381
pLen := len(tsp.pendingPolicy)
374382
if pLen == 0 {
375-
return
383+
return nil
376384
}
377385

378386
tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen))
@@ -382,17 +390,17 @@ func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
382390
// Empty pending regardless of error. If policy is invalid, it will fail on
383391
// every tick, no need to do extra work and flood the log with errors.
384392
tsp.pendingPolicy = nil
385-
386-
if err != nil {
387-
tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err))
388-
tsp.logger.Debug("Continuing to use the previously loaded sampling policy")
389-
}
393+
return err
390394
}
391395

392396
func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
393397
tsp.logger.Debug("Sampling Policy Evaluation ticked")
394398

395-
tsp.loadPendingSamplingPolicy()
399+
err := tsp.loadPendingSamplingPolicy()
400+
if err != nil {
401+
tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err))
402+
tsp.logger.Debug("Continuing to use the previously loaded sampling policy")
403+
}
396404

397405
ctx := context.Background()
398406
metrics := newPolicyMetrics(len(tsp.policies))
@@ -648,11 +656,35 @@ func (*tailSamplingSpanProcessor) Capabilities() consumer.Capabilities {
648656
}
649657

650658
// Start is invoked during service startup.
651-
func (tsp *tailSamplingSpanProcessor) Start(context.Context, component.Host) error {
659+
func (tsp *tailSamplingSpanProcessor) Start(_ context.Context, host component.Host) error {
660+
// We need to store the host before loading sampling policies in order to load any extensions.
661+
tsp.host = host
662+
if tsp.policies == nil {
663+
err := tsp.loadPendingSamplingPolicy()
664+
if err != nil {
665+
tsp.logger.Error("Failed to load initial sampling policy", zap.Error(err))
666+
return err
667+
}
668+
}
652669
tsp.policyTicker.Start(tsp.tickerFrequency)
653670
return nil
654671
}
655672

673+
func (tsp *tailSamplingSpanProcessor) extensions() map[string]samplingpolicy.Extension {
674+
if tsp.host == nil {
675+
return nil
676+
}
677+
extensions := tsp.host.GetExtensions()
678+
scoped := map[string]samplingpolicy.Extension{}
679+
for id, ext := range extensions {
680+
evaluator, ok := ext.(samplingpolicy.Extension)
681+
if ok {
682+
scoped[id.String()] = evaluator
683+
}
684+
}
685+
return scoped
686+
}
687+
656688
// Shutdown is invoked during service shutdown.
657689
func (tsp *tailSamplingSpanProcessor) Shutdown(context.Context) error {
658690
tsp.decisionBatcher.Stop()

0 commit comments

Comments
 (0)