@@ -32,6 +32,7 @@ import (
3232 "github.com/stretchr/testify/require"
3333 "go.opentelemetry.io/collector/client"
3434 "go.opentelemetry.io/collector/consumer/consumertest"
35+ "go.opentelemetry.io/collector/processor"
3536 "go.opentelemetry.io/collector/processor/processortest"
3637 "go.uber.org/zap/zapcore"
3738 "go.uber.org/zap/zaptest"
@@ -88,8 +89,30 @@ func TestProcessor(t *testing.T) {
8889 }
8990}
9091
91- // TestProcessorECS does a basic test to check if traces are processed correctly when ECS mode is enabled in the client metadata.
92+ // TestProcessorECS does a basic test to check if traces, logs, and metrics are processed correctly when ECS mode is enabled in the client metadata.
9293func TestProcessorECS (t * testing.T ) {
94+ testCases := []struct {
95+ name string
96+ testDir string
97+ testType string
98+ }{
99+ {
100+ name : "elastic_span_db" ,
101+ testDir : "elastic_span_db" ,
102+ testType : "traces" ,
103+ },
104+ {
105+ name : "elastic_log" ,
106+ testDir : "elastic_log" ,
107+ testType : "logs" ,
108+ },
109+ {
110+ name : "elastic_metric" ,
111+ testDir : "elastic_metric" ,
112+ testType : "metrics" ,
113+ },
114+ }
115+
93116 ctx := client .NewContext (context .Background (), client.Info {
94117 Addr : & net.IPAddr {
95118 IP : net .IPv4 (1 , 2 , 3 , 4 ),
@@ -99,21 +122,35 @@ func TestProcessorECS(t *testing.T) {
99122 cancel := func () {}
100123 defer cancel ()
101124
102- factory := NewFactory ()
103- settings := processortest .NewNopSettings (metadata .Type )
104- settings .TelemetrySettings .Logger = zaptest .NewLogger (t , zaptest .Level (zapcore .DebugLevel ))
105- next := & consumertest.TracesSink {}
125+ for _ , tc := range testCases {
126+ t .Run (tc .name , func (t * testing.T ) {
127+ factory := NewFactory ()
128+ settings := processortest .NewNopSettings (metadata .Type )
129+ settings .TelemetrySettings .Logger = zaptest .NewLogger (t , zaptest .Level (zapcore .DebugLevel ))
106130
107- tp , err := factory .CreateTraces (ctx , settings , createDefaultConfig (), next )
131+ dir := filepath .Join ("testdata" , "ecs" , tc .testDir )
132+ inputFile := filepath .Join (dir , "input.yaml" )
133+ outputFile := filepath .Join (dir , "output.yaml" )
108134
109- require .NoError (t , err )
110- require .IsType (t , & TraceProcessor {}, tp )
135+ switch tc .testType {
136+ case "traces" :
137+ testTraces (t , ctx , factory , settings , inputFile , outputFile )
138+ case "logs" :
139+ testLogs (t , ctx , factory , settings , inputFile , outputFile )
140+ case "metrics" :
141+ testMetrics (t , ctx , factory , settings , inputFile , outputFile )
142+ }
143+ })
144+ }
145+ }
111146
112- inputTraces , err := golden .ReadTraces ("testdata/ecs/elastic_span_db/input.yaml" )
147+ func testTraces (t * testing.T , ctx context.Context , factory processor.Factory , settings processor.Settings , inputFile , outputFile string ) {
148+ next := & consumertest.TracesSink {}
149+ tp , err := factory .CreateTraces (ctx , settings , createDefaultConfig (), next )
113150 require .NoError (t , err )
151+ require .IsType (t , & TraceProcessor {}, tp )
114152
115- outputFile := "testdata/ecs/elastic_span_db/output.yaml"
116- expectedTraces , err := golden .ReadTraces (outputFile )
153+ inputTraces , err := golden .ReadTraces (inputFile )
117154 require .NoError (t , err )
118155
119156 require .NoError (t , tp .ConsumeTraces (ctx , inputTraces ))
@@ -122,9 +159,49 @@ func TestProcessorECS(t *testing.T) {
122159 err := golden .WriteTraces (t , outputFile , actual )
123160 assert .NoError (t , err )
124161 }
162+ expectedTraces , err := golden .ReadTraces (outputFile )
163+ require .NoError (t , err )
125164 assert .NoError (t , ptracetest .CompareTraces (expectedTraces , actual ))
126165}
127166
167+ func testLogs (t * testing.T , ctx context.Context , factory processor.Factory , settings processor.Settings , inputFile , outputFile string ) {
168+ next := & consumertest.LogsSink {}
169+ lp , err := factory .CreateLogs (ctx , settings , createDefaultConfig (), next )
170+ require .NoError (t , err )
171+
172+ inputLogs , err := golden .ReadLogs (inputFile )
173+ require .NoError (t , err )
174+
175+ require .NoError (t , lp .ConsumeLogs (ctx , inputLogs ))
176+ actual := next .AllLogs ()[0 ]
177+ if * update {
178+ err := golden .WriteLogs (t , outputFile , actual )
179+ assert .NoError (t , err )
180+ }
181+ expectedLogs , err := golden .ReadLogs (outputFile )
182+ require .NoError (t , err )
183+ assert .NoError (t , plogtest .CompareLogs (expectedLogs , actual ))
184+ }
185+
186+ func testMetrics (t * testing.T , ctx context.Context , factory processor.Factory , settings processor.Settings , inputFile , outputFile string ) {
187+ next := & consumertest.MetricsSink {}
188+ mp , err := factory .CreateMetrics (ctx , settings , createDefaultConfig (), next )
189+ require .NoError (t , err )
190+
191+ inputMetrics , err := golden .ReadMetrics (inputFile )
192+ require .NoError (t , err )
193+
194+ require .NoError (t , mp .ConsumeMetrics (ctx , inputMetrics ))
195+ actual := next .AllMetrics ()[0 ]
196+ if * update {
197+ err := golden .WriteMetrics (t , outputFile , actual )
198+ assert .NoError (t , err )
199+ }
200+ expectedMetrics , err := golden .ReadMetrics (outputFile )
201+ require .NoError (t , err )
202+ assert .NoError (t , pmetrictest .CompareMetrics (expectedMetrics , actual , pmetrictest .IgnoreMetricsOrder (), pmetrictest .IgnoreResourceMetricsOrder (), pmetrictest .IgnoreTimestamp ()))
203+ }
204+
128205// TestSkipEnrichmentLogs tests that logs are only enriched when skipEnrichment is false or when mapping mode is ecs
129206func TestSkipEnrichmentLogs (t * testing.T ) {
130207 testCases := []struct {
0 commit comments