Skip to content

Commit 5ee4816

Browse files
authored
Propagate telemetry.resource to the internal logs (#12583)
#### Description Propagate `telemetry.resource` to the internal logs. #### Link to tracking issue Fixes #12582 Signed-off-by: Israel Blancas <[email protected]>
1 parent c85ebbe commit 5ee4816

File tree

3 files changed

+204
-3
lines changed

3 files changed

+204
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: "internal telemetry"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Add resource attributes from telemetry.resource to the logger"
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12582]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Resource attributes from telemetry.resource were not added to the internal
20+
console logs.
21+
22+
Now, they are added to the logger as part of the "resource" field.
23+
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: []

service/telemetry/logger.go

+16
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,22 @@ func newLogger(set Settings, cfg Config) (*zap.Logger, log.LoggerProvider, error
3939
return nil, nil, err
4040
}
4141

42+
// The attributes in cfg.Resource are added as resource attributes for logs exported through the LoggerProvider instantiated below.
43+
// To make sure they are also exposed in logs written to stdout, we add them as fields to the Zap core created above using WrapCore.
44+
// We do NOT add them to the logger using With, because that would apply to all logs, even ones exported through the core that wraps
45+
// the LoggerProvider, meaning that the attributes would be exported twice.
46+
logger = logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
47+
fields := []zap.Field{}
48+
for k, v := range cfg.Resource {
49+
if v != nil {
50+
f := zap.Stringp(k, v)
51+
fields = append(fields, f)
52+
}
53+
}
54+
r := zap.Dict("resource", fields...)
55+
return c.With([]zapcore.Field{r})
56+
}))
57+
4258
var lp log.LoggerProvider
4359

4460
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {

service/telemetry/logger_test.go

+159-3
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,28 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry"
66
import (
77
"context"
88
"errors"
9+
"io"
10+
"net/http"
11+
"net/http/httptest"
912
"reflect"
1013
"testing"
1114
"time"
1215

16+
"github.com/stretchr/testify/assert"
1317
"github.com/stretchr/testify/require"
1418
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
19+
"go.uber.org/zap"
1520
"go.uber.org/zap/zapcore"
21+
"go.uber.org/zap/zaptest/observer"
22+
23+
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
24+
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
1625
)
1726

27+
type shutdownable interface {
28+
Shutdown(context.Context) error
29+
}
30+
1831
func TestNewLogger(t *testing.T) {
1932
tests := []struct {
2033
name string
@@ -102,9 +115,6 @@ func TestNewLogger(t *testing.T) {
102115
require.NoError(t, err)
103116
gotType := reflect.TypeOf(l.Core()).String()
104117
require.Equal(t, wantCoreType, gotType)
105-
type shutdownable interface {
106-
Shutdown(context.Context) error
107-
}
108118
if prov, ok := lp.(shutdownable); ok {
109119
require.NoError(t, prov.Shutdown(context.Background()))
110120
}
@@ -114,3 +124,149 @@ func TestNewLogger(t *testing.T) {
114124
testCoreType(t, tt.wantCoreType)
115125
}
116126
}
127+
128+
func TestNewLoggerWithResource(t *testing.T) {
129+
observerCore, observedLogs := observer.New(zap.InfoLevel)
130+
131+
set := Settings{
132+
ZapOptions: []zap.Option{
133+
zap.WrapCore(func(core zapcore.Core) zapcore.Core {
134+
// Combine original core and observer core to capture everything
135+
return zapcore.NewTee(core, observerCore)
136+
}),
137+
},
138+
}
139+
140+
cfg := Config{
141+
Logs: LogsConfig{
142+
Level: zapcore.InfoLevel,
143+
Encoding: "json",
144+
},
145+
Resource: map[string]*string{
146+
"myfield": ptr("myvalue"),
147+
},
148+
}
149+
150+
mylogger, _, _ := newLogger(set, cfg)
151+
152+
mylogger.Info("Test log message")
153+
require.Len(t, observedLogs.All(), 1)
154+
155+
entry := observedLogs.All()[0]
156+
assert.Equal(t, "resource", entry.Context[0].Key)
157+
dict := entry.Context[0].Interface.(zapcore.ObjectMarshaler)
158+
enc := zapcore.NewMapObjectEncoder()
159+
require.NoError(t, dict.MarshalLogObject(enc))
160+
require.Equal(t, "myvalue", enc.Fields["myfield"])
161+
}
162+
163+
func TestOTLPLogExport(t *testing.T) {
164+
version := "1.2.3"
165+
service := "test-service"
166+
testAttribute := "test-attribute"
167+
testValue := "test-value"
168+
receivedLogs := 0
169+
totalLogs := 10
170+
171+
// Create a backend to receive the logs and assert the content
172+
srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) {
173+
body, err := io.ReadAll(request.Body)
174+
assert.NoError(t, err)
175+
defer request.Body.Close()
176+
177+
// Unmarshal the protobuf body into logs
178+
req := plogotlp.NewExportRequest()
179+
err = req.UnmarshalProto(body)
180+
assert.NoError(t, err)
181+
182+
logs := req.Logs()
183+
rl := logs.ResourceLogs().At(0)
184+
185+
resourceAttrs := rl.Resource().Attributes().AsRaw()
186+
assert.Equal(t, resourceAttrs[semconv.AttributeServiceName], service)
187+
assert.Equal(t, resourceAttrs[semconv.AttributeServiceVersion], version)
188+
assert.Equal(t, resourceAttrs[testAttribute], testValue)
189+
190+
// Check that the resource attributes are not duplicated in the log records
191+
sl := rl.ScopeLogs().At(0)
192+
logRecord := sl.LogRecords().At(0)
193+
attrs := logRecord.Attributes().AsRaw()
194+
assert.NotContains(t, attrs, semconv.AttributeServiceName)
195+
assert.NotContains(t, attrs, semconv.AttributeServiceVersion)
196+
assert.NotContains(t, attrs, testAttribute)
197+
198+
receivedLogs++
199+
200+
writer.WriteHeader(http.StatusOK)
201+
})
202+
defer srv.Close()
203+
204+
processors := []config.LogRecordProcessor{
205+
{
206+
Simple: &config.SimpleLogRecordProcessor{
207+
Exporter: config.LogRecordExporter{
208+
OTLP: &config.OTLP{
209+
Endpoint: ptr(srv.URL),
210+
Protocol: ptr("http/protobuf"),
211+
Insecure: ptr(true),
212+
},
213+
},
214+
},
215+
},
216+
}
217+
218+
cfg := Config{
219+
Logs: LogsConfig{
220+
Level: zapcore.DebugLevel,
221+
Development: true,
222+
Encoding: "json",
223+
Processors: processors,
224+
},
225+
}
226+
227+
sdk, _ := config.NewSDK(
228+
config.WithOpenTelemetryConfiguration(
229+
config.OpenTelemetryConfiguration{
230+
LoggerProvider: &config.LoggerProvider{
231+
Processors: processors,
232+
},
233+
Resource: &config.Resource{
234+
SchemaUrl: ptr(""),
235+
Attributes: []config.AttributeNameValue{
236+
{Name: semconv.AttributeServiceName, Value: service},
237+
{Name: semconv.AttributeServiceVersion, Value: version},
238+
{Name: testAttribute, Value: testValue},
239+
},
240+
},
241+
},
242+
),
243+
)
244+
245+
l, lp, err := newLogger(Settings{SDK: &sdk}, cfg)
246+
require.NoError(t, err)
247+
require.NotNil(t, l)
248+
require.NotNil(t, lp)
249+
250+
defer func() {
251+
if prov, ok := lp.(shutdownable); ok {
252+
require.NoError(t, prov.Shutdown(context.Background()))
253+
}
254+
}()
255+
256+
// Reset counter for each test case
257+
receivedLogs = 0
258+
259+
// Generate some logs to send to the backend
260+
for i := 0; i < totalLogs; i++ {
261+
l.Info("Test log message")
262+
}
263+
264+
// Ensure the correct number of logs were received
265+
require.Equal(t, totalLogs, receivedLogs)
266+
}
267+
268+
func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server {
269+
mux := http.NewServeMux()
270+
mux.HandleFunc(endpoint, handler)
271+
return httptest.NewServer(mux)
272+
}

0 commit comments

Comments
 (0)