Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.24.0

require (
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.67.1
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.49.0
Expand Down Expand Up @@ -68,7 +70,6 @@ require (
github.com/gobwas/glob v0.2.3 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/google/go-tpm v0.9.8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.4 // indirect
github.com/hashicorp/go-version v1.8.0 // indirect
Expand All @@ -89,7 +90,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/otlptranslator v0.0.2 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
github.com/rs/cors v1.11.1 // indirect
Expand Down
292 changes: 292 additions & 0 deletions internal/e2e/internal_telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package e2e

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"slices"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
)

// TestInternalTelemetry_ServiceInstanceID verifies that the service.instance.id
// attribute is generated by default (unless overridden), and is is consistent
// across all internal telemetry providers.
func TestInternalTelemetry_ServiceInstanceID(t *testing.T) {
type testcase struct {
extraYamlConfig string
checkServiceInstanceID func(t *testing.T, serviceInstanceID string)
}

for name, tt := range map[string]testcase{
"default": {
checkServiceInstanceID: func(t *testing.T, serviceInstanceID string) {
// By default, service.instance.id should be a generated UUIDv4
_, err := uuid.Parse(serviceInstanceID)
require.NoError(t, err)
},
},
"service.instance.id set in config": {
extraYamlConfig: `
service:
telemetry:
resource:
service.instance.id: "my-custom-instance-id"`,
checkServiceInstanceID: func(t *testing.T, serviceInstanceID string) {
assert.Equal(t, "my-custom-instance-id", serviceInstanceID)
},
},
} {
t.Run(name, func(t *testing.T) {
// Set up HTTP server to capture traces from collector's internal telemetry
traceSink := new(consumertest.TracesSink)
traceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
otlpReq := ptraceotlp.NewExportRequest()
if err := otlpReq.UnmarshalProto(body); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_ = traceSink.ConsumeTraces(r.Context(), otlpReq.Traces())
}))
defer traceServer.Close()

logSink := registerTestLogSink(t)

// Create temporary directory for the config file
tempdir := t.TempDir()
configFile := filepath.Join(tempdir, "config.yaml")

// Create YAML config
otlphttpPort := getFreePort(t)
metricsPort := getFreePort(t)
require.NoError(t, os.WriteFile(configFile, []byte(fmt.Sprintf(`
receivers:
otlp:
protocols:
http:
endpoint: localhost:%s

exporters:
nop:

service:
telemetry:
logs:
level: info
encoding: json
output_paths: [%q]
metrics:
level: normal
readers:
- pull:
exporter:
prometheus:
host: localhost
port: %s
traces:
level: normal
processors:
- simple:
exporter:
otlp:
protocol: http/protobuf
endpoint: %s
pipelines:
traces:
receivers: [otlp]
exporters: [nop]
`, otlphttpPort, logSink.url, metricsPort, traceServer.URL)[1:]), 0o600))

// Create collector
configURIs := []string{configFile}
if tt.extraYamlConfig != "" {
configURIs = append(configURIs, "yaml:"+tt.extraYamlConfig)
}
collector, err := otelcol.NewCollector(otelcol.CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: func() (otelcol.Factories, error) {
otlpreceiverFactory := otlpreceiver.NewFactory()
return otelcol.Factories{
Receivers: map[component.Type]receiver.Factory{
otlpreceiverFactory.Type(): otlpreceiverFactory,
},
Exporters: map[component.Type]exporter.Factory{
nopType: exportertest.NewNopFactory(),
},
Telemetry: otelconftelemetry.NewFactory(),
}, nil
},
ConfigProviderSettings: otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: configURIs,
ProviderFactories: []confmap.ProviderFactory{
fileprovider.NewFactory(),
yamlprovider.NewFactory(),
},
},
},
})
require.NoError(t, err)

// Start collector
go func() {
assert.NoError(t, collector.Run(t.Context()))
}()
waitMetricsReady(t, metricsPort)

// Send some data through the pipeline to trigger internal telemetry
err = sendTestTraces(otlphttpPort)
require.NoError(t, err)

// Capture service.instance.id from the Prometheus endpoint
var metricInstanceID string
parsed := readMetrics(t, metricsPort)
targetInfo := parsed["target_info"]
require.NotNil(t, targetInfo, "target_info metric not found")
require.Len(t, targetInfo.Metric, 1)
for _, label := range targetInfo.Metric[0].Label {
if label.GetName() == "service_instance_id" {
metricInstanceID = label.GetValue()
break
}
}
tt.checkServiceInstanceID(t, metricInstanceID)

// Wait for traces, verify service.instance.id matches the one from metrics
require.EventuallyWithT(t, func(t *assert.CollectT) {
allTraces := traceSink.AllTraces()
require.NotEmpty(t, allTraces)

// Find service.instance.id in resource attributes
for _, td := range allTraces {
for i := 0; i < td.ResourceSpans().Len(); i++ {
rs := td.ResourceSpans().At(i)
if attr, ok := rs.Resource().Attributes().Get("service.instance.id"); ok {
traceInstanceID := attr.AsString()
require.Equal(t, metricInstanceID, traceInstanceID)
}
}
}
}, 10*time.Second, 500*time.Millisecond)

// Check service.instance.id in logs matches the one from metrics
var logsCount int
logContent := logSink.Bytes()
for line := range bytes.Lines(bytes.TrimSpace(logContent)) {
var logEntry map[string]any
if err := json.Unmarshal(line, &logEntry); err != nil {
continue
}
// Check for resource field with service.instance.id
// Resource attributes are nested under "resource" key as a dictionary
resource, ok := logEntry["resource"].(map[string]any)
require.True(t, ok, "log entry should have resource field")
logInstanceID, ok := resource["service.instance.id"].(string)
require.True(t, ok, "resource should have service.instance.id")
require.Equal(t, metricInstanceID, logInstanceID)
logsCount++
}
assert.NotZero(t, logsCount)
})
}
}

// Test-specific zap sink to capture logs as close as possible to logs being written to file.
// The reason we don't actually write to files is because Zap provides no way of closing file
// sinks created by zap.Config.Build.

var (
testSinksMu sync.Mutex
testSinks = make(map[string]*testSink)
)

type testSink struct {
url string

mu sync.RWMutex
buf bytes.Buffer
}

func (s *testSink) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.buf.Write(p)
}

func (s *testSink) Bytes() []byte {
s.mu.RLock()
defer s.mu.RUnlock()
return slices.Clone(s.buf.Bytes())
}

func (*testSink) Sync() error {
return nil
}

func (*testSink) Close() error {
return nil
}

func registerTestLogSink(tb testing.TB) *testSink {
sink := &testSink{}
sink.url = fmt.Sprintf("test://%s.%p", tb.Name(), sink)

testSinksMu.Lock()
defer testSinksMu.Unlock()

testSinks[sink.url] = sink
tb.Cleanup(func() {
testSinksMu.Lock()
defer testSinksMu.Unlock()
delete(testSinks, sink.url)
})
return sink
}

func init() {
if err := zap.RegisterSink("test", func(u *url.URL) (zap.Sink, error) {
testSinksMu.Lock()
defer testSinksMu.Unlock()
sink, ok := testSinks[u.String()]
if !ok {
return nil, fmt.Errorf("no test sink registered for URL %q", u.String())
}
return sink, nil
}); err != nil {
log.Fatal(err)
}
}
Loading
Loading