Skip to content

Commit 0171340

Browse files
authored
[elasticapmintakereceiver] Propagate request metdata when include_metadata is set (#718)
* elasticapmintakereceiver: propagate request metdata when include_metadata is set * elasticapmintakereceiver: add metadata propagation test cases
1 parent 459116e commit 0171340

File tree

2 files changed

+81
-14
lines changed

2 files changed

+81
-14
lines changed

receiver/elasticapmintakereceiver/receiver.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,18 @@ func newElasticAPMIntakeReceiver(fetcher agentCfgFetcherFactory, cfg *Config, se
101101
// Start runs an HTTP server for receiving data from Elastic APM agents.
102102
func (r *elasticAPMIntakeReceiver) Start(ctx context.Context, host component.Host) error {
103103
ctx, r.cancelFn = context.WithCancel(ctx)
104-
ecsCtx := withECSMappingMode(ctx)
105-
if err := r.startHTTPServer(ecsCtx, host); err != nil {
106-
return errors.Join(err, r.Shutdown(ecsCtx))
104+
if err := r.startHTTPServer(ctx, host); err != nil {
105+
return errors.Join(err, r.Shutdown(ctx))
107106
}
108107
return nil
109108
}
110109

111110
func (r *elasticAPMIntakeReceiver) startHTTPServer(ctx context.Context, host component.Host) error {
112111
httpMux := http.NewServeMux()
113112

114-
httpMux.HandleFunc(intakeV2EventsPath, r.newElasticAPMEventsHandler(ctx))
113+
httpMux.HandleFunc(intakeV2EventsPath, r.newElasticAPMEventsHandler(func(req *http.Request) context.Context {
114+
return withECSMappingMode(req.Context(), r.cfg.IncludeMetadata)
115+
}))
115116
httpMux.HandleFunc(agentConfigPath, r.newElasticAPMConfigsHandler(ctx, host))
116117
// TODO rum v2, v3
117118

@@ -157,7 +158,7 @@ func errorHandler(w http.ResponseWriter, r *http.Request, errMsg string, statusC
157158
// TODO
158159
}
159160

160-
func (r *elasticAPMIntakeReceiver) newElasticAPMEventsHandler(ctx context.Context) http.HandlerFunc {
161+
func (r *elasticAPMIntakeReceiver) newElasticAPMEventsHandler(ctxFunc func(*http.Request) context.Context) http.HandlerFunc {
161162

162163
var (
163164
// TODO make semaphore size configurable and/or find a different way
@@ -185,7 +186,7 @@ func (r *elasticAPMIntakeReceiver) newElasticAPMEventsHandler(ctx context.Contex
185186
baseEvent := &modelpb.APMEvent{}
186187
baseEvent.Event = &modelpb.Event{}
187188
streamErr := elasticapmProcessor.HandleStream(
188-
ctx,
189+
ctxFunc(req),
189190
baseEvent,
190191
req.Body,
191192
batchSize,
@@ -460,14 +461,21 @@ func (r *elasticAPMIntakeReceiver) elasticSpanToOTelSpan(s *ptrace.Span, event *
460461
}
461462
}
462463

463-
func withECSMappingMode(ctx context.Context) context.Context {
464-
return client.NewContext(ctx, withMappingMode(client.FromContext(ctx), "ecs"))
464+
func withECSMappingMode(ctx context.Context, includeMetadata bool) context.Context {
465+
return client.NewContext(ctx, withMappingMode(client.FromContext(ctx), "ecs", includeMetadata))
465466
}
466467

467-
func withMappingMode(info client.Info, mode string) client.Info {
468+
func withMappingMode(info client.Info, mode string, includeMetadata bool) client.Info {
469+
newMeta := make(map[string][]string)
470+
if includeMetadata {
471+
for k := range info.Metadata.Keys() {
472+
newMeta[k] = info.Metadata.Get(k)
473+
}
474+
}
475+
newMeta["x-elastic-mapping-mode"] = []string{mode}
468476
return client.Info{
469477
Addr: info.Addr,
470478
Auth: info.Auth,
471-
Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {mode}}),
479+
Metadata: client.NewMetadata(newMeta),
472480
}
473481
}

receiver/elasticapmintakereceiver/receiver_test.go

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest"
3939
"github.com/stretchr/testify/assert"
4040
"github.com/stretchr/testify/require"
41+
"go.opentelemetry.io/collector/client"
4142
"go.opentelemetry.io/collector/component"
4243
"go.opentelemetry.io/collector/component/componenttest"
4344
"go.opentelemetry.io/collector/config/confighttp"
@@ -519,7 +520,65 @@ func TestTransactionsAndSpans(t *testing.T) {
519520
}
520521
}
521522

522-
func sendInput(t *testing.T, inputJsonFileName string, expectedYamlFileName string, testEndpoint string) {
523+
func TestMetadataPropagation(t *testing.T) {
524+
table := map[string]struct {
525+
includeMetadata bool
526+
expectedMetadata client.Metadata
527+
}{
528+
"when include_metadata is disabled only mappinmapping-mode is propagated": {
529+
expectedMetadata: client.NewMetadata(map[string][]string{
530+
"x-elastic-mapping-mode": {"ecs"},
531+
}),
532+
},
533+
"when include_metadata is enabled all request metadata is propagated": {
534+
includeMetadata: true,
535+
expectedMetadata: client.NewMetadata(map[string][]string{
536+
"content-type": {"application/x-ndjson"},
537+
"x-elastic-mapping-mode": {"ecs"},
538+
}),
539+
},
540+
}
541+
for tname, tcase := range table {
542+
t.Run(tname, func(t *testing.T) {
543+
factory := NewFactory()
544+
testEndpoint := testutil.GetAvailableLocalAddress(t)
545+
cfg := &Config{
546+
ServerConfig: confighttp.ServerConfig{
547+
Endpoint: testEndpoint,
548+
IncludeMetadata: tcase.includeMetadata,
549+
},
550+
}
551+
552+
set := receivertest.NewNopSettings(metadata.Type)
553+
nextTrace := new(consumertest.TracesSink)
554+
receiver, _ := factory.CreateTraces(context.Background(), set, cfg, nextTrace)
555+
556+
if err := receiver.Start(context.Background(), componenttest.NewNopHost()); err != nil {
557+
t.Errorf("Starting receiver failed: %v", err)
558+
}
559+
defer func() {
560+
if err := receiver.Shutdown(context.Background()); err != nil {
561+
t.Errorf("Shutdown failed: %v", err)
562+
}
563+
}()
564+
565+
sendInput(t, "transactions_spans.ndjson", testEndpoint)
566+
567+
ctxs := nextTrace.Contexts()
568+
require.GreaterOrEqual(t, len(ctxs), 1)
569+
md := client.FromContext(ctxs[0]).Metadata
570+
if tcase.includeMetadata {
571+
for k := range tcase.expectedMetadata.Keys() {
572+
require.Equal(t, tcase.expectedMetadata.Get(k), md.Get(k))
573+
}
574+
} else {
575+
require.Equal(t, tcase.expectedMetadata, md)
576+
}
577+
})
578+
}
579+
}
580+
581+
func sendInput(t *testing.T, inputJsonFileName string, testEndpoint string) {
523582
data, err := os.ReadFile(filepath.Join(testData, inputJsonFileName))
524583
if err != nil {
525584
t.Fatalf("failed to read file: %v", err)
@@ -542,7 +601,7 @@ func runComparisonForTraces(t *testing.T, inputJsonFileName string, expectedYaml
542601
) {
543602
nextTrace.Reset()
544603

545-
sendInput(t, inputJsonFileName, expectedYamlFileName, testEndpoint)
604+
sendInput(t, inputJsonFileName, testEndpoint)
546605
actualTraces := nextTrace.AllTraces()[0]
547606
expectedFile := filepath.Join(testData, expectedYamlFileName)
548607
// Use this line to generate the expected yaml file:
@@ -558,7 +617,7 @@ func runComparisonForErrors(t *testing.T, inputJsonFileName string, expectedYaml
558617
) {
559618
nextLog.Reset()
560619

561-
sendInput(t, inputJsonFileName, expectedYamlFileName, testEndpoint)
620+
sendInput(t, inputJsonFileName, testEndpoint)
562621
actualLogs := nextLog.AllLogs()[0]
563622
expectedFile := filepath.Join(testData, expectedYamlFileName)
564623
// Use this line to generate the expected yaml file:
@@ -573,7 +632,7 @@ func runComparisonForMetrics(t *testing.T, inputJsonFileName string, expectedYam
573632
) {
574633

575634
nextMetric.Reset()
576-
sendInput(t, inputJsonFileName, expectedYamlFileName, testEndpoint)
635+
sendInput(t, inputJsonFileName, testEndpoint)
577636
actualMetrics := nextMetric.AllMetrics()[0]
578637
expectedFile := filepath.Join(testData, expectedYamlFileName)
579638
// Use this line to generate the expected yaml file:

0 commit comments

Comments
 (0)