Skip to content

Commit 07f58dc

Browse files
Exporter tinybird traces implementation (#41248)
#### Description Implement traces propagation for the new Tinybird Exporter. The exporter iterates over the ptraces data, extracts the required fields (service name, attributes, spanID, etc.), generates an NDJSON, and performs a request to the Tinybird [EventsAPI](https://www.tinybird.co/docs/forward/get-data-in/events-api) with all the data. It's the same implementation done in [logs](#40993) but this time focused on traces. #### Link to tracking issue Related to #40475 #### Testing Included traces conversion tests and HTTP request tests.
1 parent 6492845 commit 07f58dc

File tree

5 files changed

+566
-2
lines changed

5 files changed

+566
-2
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: tinybirdexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add traces implementation
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40475]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/tinybirdexporter/exporter.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,18 @@ func (e *tinybirdExporter) start(ctx context.Context, host component.Host) error
6060
return err
6161
}
6262

63-
func (e *tinybirdExporter) pushTraces(_ context.Context, _ ptrace.Traces) error {
64-
return errors.New("this component is under development and traces are not yet supported, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40475 to track development progress")
63+
func (e *tinybirdExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
64+
buffer := bytes.NewBuffer(nil)
65+
encoder := json.NewEncoder(buffer)
66+
err := internal.ConvertTraces(td, encoder)
67+
if err != nil {
68+
return consumererror.NewPermanent(err)
69+
}
70+
71+
if buffer.Len() > 0 {
72+
return e.export(ctx, e.config.Traces.Datasource, buffer)
73+
}
74+
return nil
6575
}
6676

6777
func (e *tinybirdExporter) pushMetrics(_ context.Context, _ pmetric.Metrics) error {

exporter/tinybirdexporter/exporter_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.opentelemetry.io/collector/exporter/exportertest"
1818
"go.opentelemetry.io/collector/pdata/pcommon"
1919
"go.opentelemetry.io/collector/pdata/plog"
20+
"go.opentelemetry.io/collector/pdata/ptrace"
2021

2122
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal/metadata"
2223
)
@@ -49,6 +50,140 @@ func TestNewExporter(t *testing.T) {
4950
}
5051
}
5152

53+
func TestExportTraces(t *testing.T) {
54+
type args struct {
55+
traces ptrace.Traces
56+
config Config
57+
}
58+
type want struct {
59+
requestQuery string
60+
requestBody string
61+
responseStatus int
62+
err error
63+
}
64+
tests := []struct {
65+
name string
66+
args args
67+
want want
68+
}{
69+
{
70+
name: "export without traces",
71+
args: args{
72+
traces: func() ptrace.Traces {
73+
traces := ptrace.NewTraces()
74+
rs := traces.ResourceSpans().AppendEmpty()
75+
rs.ScopeSpans().AppendEmpty()
76+
return traces
77+
}(),
78+
config: Config{
79+
ClientConfig: confighttp.ClientConfig{},
80+
Token: "test-token",
81+
Traces: SignalConfig{Datasource: "traces_test"},
82+
Wait: false,
83+
},
84+
},
85+
want: want{
86+
requestQuery: "name=traces_test",
87+
requestBody: "",
88+
responseStatus: http.StatusOK,
89+
err: nil,
90+
},
91+
},
92+
{
93+
name: "export with full trace",
94+
args: args{
95+
traces: func() ptrace.Traces {
96+
traces := ptrace.NewTraces()
97+
rs := traces.ResourceSpans().AppendEmpty()
98+
rs.SetSchemaUrl("https://opentelemetry.io/schemas/1.20.0")
99+
resource := rs.Resource()
100+
resource.Attributes().PutStr("service.name", "test-service")
101+
resource.Attributes().PutStr("environment", "production")
102+
103+
ss := rs.ScopeSpans().AppendEmpty()
104+
ss.SetSchemaUrl("https://opentelemetry.io/schemas/1.20.0")
105+
scope := ss.Scope()
106+
scope.SetName("test-scope")
107+
scope.SetVersion("1.0.0")
108+
scope.Attributes().PutStr("telemetry.sdk.name", "opentelemetry")
109+
110+
span := ss.Spans().AppendEmpty()
111+
span.SetTraceID(pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
112+
span.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
113+
span.SetParentSpanID(pcommon.SpanID([8]byte{9, 10, 11, 12, 13, 14, 15, 16}))
114+
span.SetName("test-span")
115+
span.SetKind(ptrace.SpanKindServer)
116+
span.SetStartTimestamp(pcommon.Timestamp(1719158400000000000)) // 2024-06-23T16:00:00Z
117+
span.SetEndTimestamp(pcommon.Timestamp(1719158401000000000)) // 2024-06-23T16:00:01Z
118+
span.Status().SetCode(ptrace.StatusCodeOk)
119+
span.Status().SetMessage("success")
120+
span.Attributes().PutStr("http.method", "GET")
121+
span.Attributes().PutStr("http.url", "/api/users")
122+
span.Attributes().PutStr("user.id", "12345")
123+
124+
// Add span event
125+
event := span.Events().AppendEmpty()
126+
event.SetName("exception")
127+
event.SetTimestamp(pcommon.Timestamp(1719158400500000000)) // 2024-06-23T16:00:00.5Z
128+
event.Attributes().PutStr("exception.type", "RuntimeException")
129+
event.Attributes().PutStr("exception.message", "Something went wrong")
130+
131+
// Add span link
132+
link := span.Links().AppendEmpty()
133+
link.SetTraceID(pcommon.TraceID([16]byte{17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}))
134+
link.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24}))
135+
link.TraceState().FromRaw("sampled=true")
136+
link.Attributes().PutStr("link.type", "child")
137+
138+
return traces
139+
}(),
140+
config: Config{
141+
ClientConfig: confighttp.ClientConfig{},
142+
Token: "test-token",
143+
Traces: SignalConfig{Datasource: "traces_test"},
144+
Wait: false,
145+
},
146+
},
147+
want: want{
148+
requestQuery: "name=traces_test",
149+
requestBody: `{"resource_schema_url":"https://opentelemetry.io/schemas/1.20.0","resource_attributes":{"service.name":"test-service","environment":"production"},"service_name":"test-service","scope_schema_url":"https://opentelemetry.io/schemas/1.20.0","scope_name":"test-scope","scope_version":"1.0.0","scope_attributes":{"telemetry.sdk.name":"opentelemetry"},"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","parent_span_id":"090a0b0c0d0e0f10","trace_state":"","trace_flags":0,"span_name":"test-span","span_kind":"Server","span_attributes":{"http.method":"GET","http.url":"/api/users","user.id":"12345"},"start_time":"2024-06-23T16:00:00Z","end_time":"2024-06-23T16:00:01Z","duration":1000000000,"status_code":"Ok","status_message":"success","events_timestamp":["2024-06-23T16:00:00.5Z"],"events_name":["exception"],"events_attributes":[{"exception.type":"RuntimeException","exception.message":"Something went wrong"}],"links_trace_id":["1112131415161718191a1b1c1d1e1f20"],"links_span_id":["1112131415161718"],"links_trace_state":["sampled=true"],"links_attributes":[{"link.type":"child"}]}`,
150+
responseStatus: http.StatusOK,
151+
err: nil,
152+
},
153+
},
154+
}
155+
156+
for _, tt := range tests {
157+
t.Run(tt.name, func(t *testing.T) {
158+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
159+
assert.Equal(t, "POST", r.Method)
160+
assert.Equal(t, "/v0/events", r.URL.Path)
161+
assert.Equal(t, tt.want.requestQuery, r.URL.RawQuery)
162+
assert.Equal(t, "application/x-ndjson", r.Header.Get("Content-Type"))
163+
assert.Equal(t, "Bearer "+string(tt.args.config.Token), r.Header.Get("Authorization"))
164+
gotBody, err := io.ReadAll(r.Body)
165+
assert.NoError(t, err)
166+
assert.JSONEq(t, tt.want.requestBody, string(gotBody))
167+
168+
w.WriteHeader(tt.want.responseStatus)
169+
}))
170+
defer server.Close()
171+
172+
tt.args.config.ClientConfig.Endpoint = server.URL
173+
174+
exp := newExporter(&tt.args.config, exportertest.NewNopSettings(metadata.Type))
175+
require.NoError(t, exp.start(context.Background(), componenttest.NewNopHost()))
176+
177+
err := exp.pushTraces(context.Background(), tt.args.traces)
178+
if tt.want.err != nil {
179+
assert.Error(t, err)
180+
} else {
181+
assert.NoError(t, err)
182+
}
183+
})
184+
}
185+
}
186+
52187
func TestExportLogs(t *testing.T) {
53188
type args struct {
54189
logs plog.Logs
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal"
5+
6+
import (
7+
"time"
8+
9+
"go.opentelemetry.io/collector/pdata/ptrace"
10+
11+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
12+
)
13+
14+
type traceSignal struct {
15+
ResourceSchemaURL string `json:"resource_schema_url"`
16+
ResourceAttributes map[string]string `json:"resource_attributes"`
17+
ServiceName string `json:"service_name"`
18+
ScopeSchemaURL string `json:"scope_schema_url"`
19+
ScopeName string `json:"scope_name"`
20+
ScopeVersion string `json:"scope_version"`
21+
ScopeAttributes map[string]string `json:"scope_attributes"`
22+
TraceID string `json:"trace_id"`
23+
SpanID string `json:"span_id"`
24+
ParentSpanID string `json:"parent_span_id"`
25+
TraceState string `json:"trace_state"`
26+
TraceFlags uint32 `json:"trace_flags"`
27+
SpanName string `json:"span_name"`
28+
SpanKind string `json:"span_kind"`
29+
SpanAttributes map[string]string `json:"span_attributes"`
30+
StartTime string `json:"start_time"`
31+
// used when users choose the StartTime-to-EndTime approach
32+
EndTime string `json:"end_time,omitempty"`
33+
// used when users choose the StartTime-plus-Duration approach
34+
Duration int64 `json:"duration,omitempty"`
35+
StatusCode string `json:"status_code"`
36+
StatusMessage string `json:"status_message"`
37+
EventsTimestamp []string `json:"events_timestamp"`
38+
EventsName []string `json:"events_name"`
39+
EventsAttributes []map[string]string `json:"events_attributes"`
40+
LinksTraceID []string `json:"links_trace_id"`
41+
LinksSpanID []string `json:"links_span_id"`
42+
LinksTraceState []string `json:"links_trace_state"`
43+
LinksAttributes []map[string]string `json:"links_attributes"`
44+
}
45+
46+
func convertEvents(events ptrace.SpanEventSlice) (timestamps []string, names []string, attributes []map[string]string) {
47+
timestamps = make([]string, events.Len())
48+
names = make([]string, events.Len())
49+
attributes = make([]map[string]string, events.Len())
50+
for i := 0; i < events.Len(); i++ {
51+
event := events.At(i)
52+
timestamps[i] = event.Timestamp().AsTime().Format(time.RFC3339Nano)
53+
names[i] = event.Name()
54+
attributes[i] = convertAttributes(event.Attributes())
55+
}
56+
return timestamps, names, attributes
57+
}
58+
59+
func convertLinks(links ptrace.SpanLinkSlice) (traceIDs []string, spanIDs []string, states []string, attrs []map[string]string) {
60+
traceIDs = make([]string, links.Len())
61+
spanIDs = make([]string, links.Len())
62+
states = make([]string, links.Len())
63+
attrs = make([]map[string]string, links.Len())
64+
for i := 0; i < links.Len(); i++ {
65+
link := links.At(i)
66+
traceIDs[i] = traceutil.TraceIDToHexOrEmptyString(link.TraceID())
67+
spanIDs[i] = traceutil.SpanIDToHexOrEmptyString(link.SpanID())
68+
states[i] = link.TraceState().AsRaw()
69+
attrs[i] = convertAttributes(link.Attributes())
70+
}
71+
return traceIDs, spanIDs, states, attrs
72+
}
73+
74+
func ConvertTraces(td ptrace.Traces, encoder Encoder) error {
75+
for i := 0; i < td.ResourceSpans().Len(); i++ {
76+
rs := td.ResourceSpans().At(i)
77+
schemaURL := rs.SchemaUrl()
78+
resource := rs.Resource()
79+
resourceAttributesMap := resource.Attributes()
80+
resourceAttributes := convertAttributes(resourceAttributesMap)
81+
serviceName := getServiceName(resourceAttributesMap)
82+
for j := 0; j < rs.ScopeSpans().Len(); j++ {
83+
ss := rs.ScopeSpans().At(j)
84+
ScopeSchemaURL := ss.SchemaUrl()
85+
scope := ss.Scope()
86+
scopeAttributes := convertAttributes(scope.Attributes())
87+
for k := 0; k < ss.Spans().Len(); k++ {
88+
span := ss.Spans().At(k)
89+
attributes := span.Attributes()
90+
eventsTimestamp, eventsName, eventsAttributes := convertEvents(span.Events())
91+
linksTraceID, linksSpanID, linksTraceState, linksAttributes := convertLinks(span.Links())
92+
traceEntry := traceSignal{
93+
ResourceSchemaURL: schemaURL,
94+
ResourceAttributes: resourceAttributes,
95+
ServiceName: serviceName,
96+
ScopeSchemaURL: ScopeSchemaURL,
97+
ScopeName: scope.Name(),
98+
ScopeVersion: scope.Version(),
99+
ScopeAttributes: scopeAttributes,
100+
TraceID: traceutil.TraceIDToHexOrEmptyString(span.TraceID()),
101+
SpanID: traceutil.SpanIDToHexOrEmptyString(span.SpanID()),
102+
ParentSpanID: traceutil.SpanIDToHexOrEmptyString(span.ParentSpanID()),
103+
TraceState: span.TraceState().AsRaw(),
104+
TraceFlags: span.Flags(),
105+
SpanName: span.Name(),
106+
SpanKind: span.Kind().String(),
107+
SpanAttributes: convertAttributes(attributes),
108+
StartTime: span.StartTimestamp().AsTime().Format(time.RFC3339Nano),
109+
EndTime: span.EndTimestamp().AsTime().Format(time.RFC3339Nano),
110+
Duration: span.EndTimestamp().AsTime().Sub(span.StartTimestamp().AsTime()).Nanoseconds(),
111+
StatusCode: span.Status().Code().String(),
112+
StatusMessage: span.Status().Message(),
113+
EventsTimestamp: eventsTimestamp,
114+
EventsName: eventsName,
115+
EventsAttributes: eventsAttributes,
116+
LinksTraceID: linksTraceID,
117+
LinksSpanID: linksSpanID,
118+
LinksTraceState: linksTraceState,
119+
LinksAttributes: linksAttributes,
120+
}
121+
if err := encoder.Encode(traceEntry); err != nil {
122+
return err
123+
}
124+
}
125+
}
126+
}
127+
return nil
128+
}

0 commit comments

Comments
 (0)