Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .chloggen/telemetry-defaultdroppedinstruments.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp)
component: pkg/service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Replace service/telemetry.MeterSettings.DefaultViews

# One or more tracking issues or pull requests related to the change
issues: [13809]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The `service/telemetry.MeterSettings.DefaultViews` field has been removed, and replaced with
`service/telemetry.MeterSettings.DefaultDroppedInstruments`. This new field is a function that
returns a slice of `InstrumentSelector`s, each of which identifies a meter name and/or instrument
name to be dropped by default. These are directly translated to views by otelconftelemetry.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 2 additions & 2 deletions internal/e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.24.0

require (
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.67.1
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/collector/component v1.48.0
Expand Down Expand Up @@ -68,7 +70,6 @@ require (
github.com/gobwas/glob v0.2.3 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/google/go-tpm v0.9.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
github.com/hashicorp/go-version v1.8.0 // indirect
Expand All @@ -89,7 +90,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/otlptranslator v0.0.2 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
github.com/rs/cors v1.11.1 // indirect
Expand Down
292 changes: 292 additions & 0 deletions internal/e2e/internal_telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package e2e

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"slices"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
)

// TestInternalTelemetry_ServiceInstanceID verifies that the service.instance.id
// attribute is generated by default (unless overridden), and is is consistent
// across all internal telemetry providers.
func TestInternalTelemetry_ServiceInstanceID(t *testing.T) {
type testcase struct {
extraYamlConfig string
checkServiceInstanceID func(t *testing.T, serviceInstanceID string)
}

for name, tt := range map[string]testcase{
"default": {
checkServiceInstanceID: func(t *testing.T, serviceInstanceID string) {
// By default, service.instance.id should be a generated UUIDv4
_, err := uuid.Parse(serviceInstanceID)
require.NoError(t, err)
},
},
"service.instance.id set in config": {
extraYamlConfig: `
service:
telemetry:
resource:
service.instance.id: "my-custom-instance-id"`,
checkServiceInstanceID: func(t *testing.T, serviceInstanceID string) {
assert.Equal(t, "my-custom-instance-id", serviceInstanceID)
},
},
} {
t.Run(name, func(t *testing.T) {
// Set up HTTP server to capture traces from collector's internal telemetry
traceSink := new(consumertest.TracesSink)
traceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
otlpReq := ptraceotlp.NewExportRequest()
if err := otlpReq.UnmarshalProto(body); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_ = traceSink.ConsumeTraces(r.Context(), otlpReq.Traces())
}))
defer traceServer.Close()

logSink := registerTestLogSink(t)

// Create temporary directory for the config file
tempdir := t.TempDir()
configFile := filepath.Join(tempdir, "config.yaml")

// Create YAML config
otlphttpPort := getFreePort(t)
metricsPort := getFreePort(t)
require.NoError(t, os.WriteFile(configFile, []byte(fmt.Sprintf(`
receivers:
otlp:
protocols:
http:
endpoint: localhost:%s

exporters:
nop:

service:
telemetry:
logs:
level: info
encoding: json
output_paths: [%q]
metrics:
level: normal
readers:
- pull:
exporter:
prometheus:
host: localhost
port: %s
traces:
level: normal
processors:
- simple:
exporter:
otlp:
protocol: http/protobuf
endpoint: %s
pipelines:
traces:
receivers: [otlp]
exporters: [nop]
`, otlphttpPort, logSink.url, metricsPort, traceServer.URL)[1:]), 0o600))

// Create collector
configURIs := []string{configFile}
if tt.extraYamlConfig != "" {
configURIs = append(configURIs, "yaml:"+tt.extraYamlConfig)
}
collector, err := otelcol.NewCollector(otelcol.CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: func() (otelcol.Factories, error) {
otlpreceiverFactory := otlpreceiver.NewFactory()
return otelcol.Factories{
Receivers: map[component.Type]receiver.Factory{
otlpreceiverFactory.Type(): otlpreceiverFactory,
},
Exporters: map[component.Type]exporter.Factory{
nopType: exportertest.NewNopFactory(),
},
Telemetry: otelconftelemetry.NewFactory(),
}, nil
},
ConfigProviderSettings: otelcol.ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: configURIs,
ProviderFactories: []confmap.ProviderFactory{
fileprovider.NewFactory(),
yamlprovider.NewFactory(),
},
},
},
})
require.NoError(t, err)

// Start collector
go func() {
assert.NoError(t, collector.Run(t.Context()))
}()
waitMetricsReady(t, metricsPort)

// Send some data through the pipeline to trigger internal telemetry
err = sendTestTraces(otlphttpPort)
require.NoError(t, err)

// Capture service.instance.id from the Prometheus endpoint
var metricInstanceID string
parsed := readMetrics(t, metricsPort)
targetInfo := parsed["target_info"]
require.NotNil(t, targetInfo, "target_info metric not found")
require.Len(t, targetInfo.Metric, 1)
for _, label := range targetInfo.Metric[0].Label {
if label.GetName() == "service_instance_id" {
metricInstanceID = label.GetValue()
break
}
}
tt.checkServiceInstanceID(t, metricInstanceID)

// Wait for traces, verify service.instance.id matches the one from metrics
require.EventuallyWithT(t, func(t *assert.CollectT) {
allTraces := traceSink.AllTraces()
require.NotEmpty(t, allTraces)

// Find service.instance.id in resource attributes
for _, td := range allTraces {
for i := 0; i < td.ResourceSpans().Len(); i++ {
rs := td.ResourceSpans().At(i)
if attr, ok := rs.Resource().Attributes().Get("service.instance.id"); ok {
traceInstanceID := attr.AsString()
require.Equal(t, metricInstanceID, traceInstanceID)
}
}
}
}, 10*time.Second, 500*time.Millisecond)

// Check service.instance.id in logs matches the one from metrics
var logsCount int
logContent := logSink.Bytes()
for line := range bytes.Lines(bytes.TrimSpace(logContent)) {
var logEntry map[string]any
if err := json.Unmarshal(line, &logEntry); err != nil {
continue
}
// Check for resource field with service.instance.id
// Resource attributes are nested under "resource" key as a dictionary
resource, ok := logEntry["resource"].(map[string]any)
require.True(t, ok, "log entry should have resource field")
logInstanceID, ok := resource["service.instance.id"].(string)
require.True(t, ok, "resource should have service.instance.id")
require.Equal(t, metricInstanceID, logInstanceID)
logsCount++
}
assert.NotZero(t, logsCount)
})
}
}

// Test-specific zap sink to capture logs as close as possible to logs being written to file.
// The reason we don't actually write to files is because Zap provides no way of closing file
// sinks created by zap.Config.Build.

var (
testSinksMu sync.Mutex
testSinks = make(map[string]*testSink)
)

type testSink struct {
url string

mu sync.RWMutex
buf bytes.Buffer
}

func (s *testSink) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.buf.Write(p)
}

func (s *testSink) Bytes() []byte {
s.mu.RLock()
defer s.mu.RUnlock()
return slices.Clone(s.buf.Bytes())
}

func (*testSink) Sync() error {
return nil
}

func (*testSink) Close() error {
return nil
}

func registerTestLogSink(tb testing.TB) *testSink {
sink := &testSink{}
sink.url = fmt.Sprintf("test://%s.%p", tb.Name(), sink)

testSinksMu.Lock()
defer testSinksMu.Unlock()

testSinks[sink.url] = sink
tb.Cleanup(func() {
testSinksMu.Lock()
defer testSinksMu.Unlock()
delete(testSinks, sink.url)
})
return sink
}

func init() {
if err := zap.RegisterSink("test", func(u *url.URL) (zap.Sink, error) {
testSinksMu.Lock()
defer testSinksMu.Unlock()
sink, ok := testSinks[u.String()]
if !ok {
return nil, fmt.Errorf("no test sink registered for URL %q", u.String())
}
return sink, nil
}); err != nil {
log.Fatal(err)
}
}
Loading
Loading