Skip to content

Commit 0d1e6c4

Browse files
jordivilasecaDylan-M
authored andcommitted
[exporter/tinybird] implement logs propagation (open-telemetry#40993)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Implement logs propagation for the new Tinybird Exporter. The exporter iterates over the plog data, extracts the required fields (service name, attributes, log severity, etc.), generates an NDJSON, and performs a request to the Tinybird [EventsAPI](https://www.tinybird.co/docs/forward/get-data-in/events-api) with all the data. The implementation is inspired in the `otlphttp` exporter (both perform HTTP requests). - Exporter config has been modified to include - `confighttp.ClientConfig`: allow the configuration of the HTTP client - `configretry.BackOffConfig`: allow the configuration of retries, - `exporterhelper.QueueBatchConfig`: allow the configuration of sending queue and batching strategy. - Factories have been updated to propagate these new configs <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Related to open-telemetry#40475 <!--Describe what testing was performed and which tests were added.--> #### Testing Included logs conversion tests and HTTP request tests.
1 parent 0af684f commit 0d1e6c4

File tree

13 files changed

+858
-62
lines changed

13 files changed

+858
-62
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: new_component
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: tinybird
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implement logs propagation for Tinybird exporter
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40475]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/tinybirdexporter/config.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44
package tinybirdexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter"
55

66
import (
7+
"errors"
78
"fmt"
89
"net/url"
910
"regexp"
1011

1112
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/config/confighttp"
1214
"go.opentelemetry.io/collector/config/configopaque"
15+
"go.opentelemetry.io/collector/config/configretry"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper"
1317
)
1418

1519
var datasourceRegex = regexp.MustCompile(`^[\w_]+$`)
@@ -21,6 +25,9 @@ type SignalConfig struct {
2125
}
2226

2327
func (cfg SignalConfig) Validate() error {
28+
if cfg.Datasource == "" {
29+
return errors.New("datasource cannot be empty")
30+
}
2431
if !datasourceRegex.MatchString(cfg.Datasource) {
2532
return fmt.Errorf("invalid datasource %q: only letters, numbers, and underscores are allowed", cfg.Datasource)
2633
}
@@ -29,32 +36,38 @@ func (cfg SignalConfig) Validate() error {
2936

3037
// Config defines configuration for the Tinybird exporter.
3138
type Config struct {
32-
Endpoint string `mapstructure:"endpoint"`
33-
Token configopaque.String `mapstructure:"token"`
34-
Metrics SignalConfig `mapstructure:"metrics"`
35-
Traces SignalConfig `mapstructure:"traces"`
36-
Logs SignalConfig `mapstructure:"logs"`
39+
ClientConfig confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
40+
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
41+
QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
42+
43+
// Tinybird API token.
44+
Token configopaque.String `mapstructure:"token"`
45+
Metrics SignalConfig `mapstructure:"metrics"`
46+
Traces SignalConfig `mapstructure:"traces"`
47+
Logs SignalConfig `mapstructure:"logs"`
48+
// Wait for data to be ingested before returning a response.
49+
Wait bool `mapstructure:"wait"`
3750
}
3851

3952
var _ component.Config = (*Config)(nil)
4053

4154
// Validate checks if the exporter configuration is valid
4255
func (cfg *Config) Validate() error {
43-
if cfg.Token == "" {
44-
return errMissingToken
45-
}
46-
if cfg.Endpoint == "" {
56+
if cfg.ClientConfig.Endpoint == "" {
4757
return errMissingEndpoint
4858
}
49-
u, err := url.Parse(cfg.Endpoint)
59+
u, err := url.Parse(cfg.ClientConfig.Endpoint)
5060
if err != nil {
5161
return fmt.Errorf("endpoint must be a valid URL: %w", err)
5262
}
5363
if u.Scheme != "http" && u.Scheme != "https" {
54-
return fmt.Errorf("endpoint must have http or https scheme: %q", cfg.Endpoint)
64+
return fmt.Errorf("endpoint must have http or https scheme: %q", cfg.ClientConfig.Endpoint)
5565
}
5666
if u.Host == "" {
57-
return fmt.Errorf("endpoint must have a host: %q", cfg.Endpoint)
67+
return fmt.Errorf("endpoint must have a host: %q", cfg.ClientConfig.Endpoint)
68+
}
69+
if cfg.Token == "" {
70+
return errMissingToken
5871
}
5972
return nil
6073
}

exporter/tinybirdexporter/config_test.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ import (
1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
1212
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/config/configcompression"
14+
"go.opentelemetry.io/collector/config/confighttp"
15+
"go.opentelemetry.io/collector/config/configretry"
1316
"go.opentelemetry.io/collector/confmap/confmaptest"
1417
"go.opentelemetry.io/collector/confmap/xconfmap"
18+
"go.opentelemetry.io/collector/exporter/exporterhelper"
1519
)
1620

1721
func TestLoadConfig(t *testing.T) {
@@ -30,22 +34,44 @@ func TestLoadConfig(t *testing.T) {
3034
id: component.NewIDWithName(component.MustNewType(typeStr), ""),
3135
subName: "tinybird",
3236
expected: &Config{
33-
Endpoint: "https://api.tinybird.co",
34-
Token: "test-token",
35-
Metrics: SignalConfig{Datasource: "metrics"},
36-
Traces: SignalConfig{Datasource: "traces"},
37-
Logs: SignalConfig{Datasource: "logs"},
37+
ClientConfig: func() confighttp.ClientConfig {
38+
cfg := createDefaultConfig().(*Config).ClientConfig
39+
cfg.Endpoint = "https://api.tinybird.co"
40+
return cfg
41+
}(),
42+
RetryConfig: configretry.NewDefaultBackOffConfig(),
43+
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
44+
Token: "test-token",
45+
Metrics: SignalConfig{Datasource: "metrics"},
46+
Traces: SignalConfig{Datasource: "traces"},
47+
Logs: SignalConfig{Datasource: "logs"},
3848
},
3949
},
4050
{
4151
id: component.NewIDWithName(component.MustNewType(typeStr), "full"),
4252
subName: "tinybird/full",
4353
expected: &Config{
44-
Endpoint: "https://api.tinybird.co",
45-
Token: "test-token",
46-
Metrics: SignalConfig{Datasource: "metrics"},
47-
Traces: SignalConfig{Datasource: "traces"},
48-
Logs: SignalConfig{Datasource: "logs"},
54+
ClientConfig: func() confighttp.ClientConfig {
55+
cfg := createDefaultConfig().(*Config).ClientConfig
56+
cfg.Endpoint = "https://api.tinybird.co"
57+
cfg.Compression = configcompression.TypeZstd
58+
return cfg
59+
}(),
60+
RetryConfig: func() configretry.BackOffConfig {
61+
cfg := createDefaultConfig().(*Config).RetryConfig
62+
cfg.Enabled = false
63+
return cfg
64+
}(),
65+
QueueConfig: func() exporterhelper.QueueBatchConfig {
66+
cfg := createDefaultConfig().(*Config).QueueConfig
67+
cfg.Enabled = false
68+
return cfg
69+
}(),
70+
Token: "test-token",
71+
Metrics: SignalConfig{Datasource: "metrics"},
72+
Traces: SignalConfig{Datasource: "traces"},
73+
Logs: SignalConfig{Datasource: "logs"},
74+
Wait: true,
4975
},
5076
},
5177
{

exporter/tinybirdexporter/exporter.go

Lines changed: 140 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,60 @@
44
package tinybirdexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter"
55

66
import (
7+
"bytes"
78
"context"
9+
"encoding/json"
810
"errors"
11+
"fmt"
12+
"io"
13+
"net/http"
14+
"runtime"
15+
"strconv"
16+
"time"
917

1018
"go.opentelemetry.io/collector/component"
19+
"go.opentelemetry.io/collector/consumer/consumererror"
1120
"go.opentelemetry.io/collector/exporter"
21+
"go.opentelemetry.io/collector/exporter/exporterhelper"
1222
"go.opentelemetry.io/collector/pdata/plog"
1323
"go.opentelemetry.io/collector/pdata/pmetric"
1424
"go.opentelemetry.io/collector/pdata/ptrace"
25+
"go.uber.org/zap"
26+
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tinybirdexporter/internal"
1528
)
1629

17-
type tinybirdExporter struct{}
30+
const (
31+
headerRetryAfter = "Retry-After"
32+
contentTypeNDJSON = "application/x-ndjson"
33+
)
1834

19-
func newExporter(_ component.Config, _ exporter.Settings) (*tinybirdExporter, error) {
20-
return &tinybirdExporter{}, nil
35+
type tinybirdExporter struct {
36+
config *Config
37+
client *http.Client
38+
logger *zap.Logger
39+
settings component.TelemetrySettings
40+
userAgent string
2141
}
2242

23-
func (e *tinybirdExporter) start(_ context.Context, _ component.Host) error {
24-
return nil
43+
func newExporter(cfg component.Config, set exporter.Settings) *tinybirdExporter {
44+
oCfg := cfg.(*Config)
45+
46+
userAgent := fmt.Sprintf("%s/%s (%s/%s)",
47+
set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH)
48+
49+
return &tinybirdExporter{
50+
config: oCfg,
51+
logger: set.Logger,
52+
userAgent: userAgent,
53+
settings: set.TelemetrySettings,
54+
}
55+
}
56+
57+
func (e *tinybirdExporter) start(ctx context.Context, host component.Host) error {
58+
var err error
59+
e.client, err = e.config.ClientConfig.ToClient(ctx, host, e.settings)
60+
return err
2561
}
2662

2763
func (e *tinybirdExporter) pushTraces(_ context.Context, _ ptrace.Traces) error {
@@ -32,6 +68,103 @@ func (e *tinybirdExporter) pushMetrics(_ context.Context, _ pmetric.Metrics) err
3268
return errors.New("this component is under development and metrics are not yet supported, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40475 to track development progress")
3369
}
3470

35-
func (e *tinybirdExporter) pushLogs(_ context.Context, _ plog.Logs) error {
36-
return errors.New("this component is under development and logs are not yet supported, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40475 to track development progress")
71+
func (e *tinybirdExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
72+
buffer := bytes.NewBuffer(nil)
73+
encoder := json.NewEncoder(buffer)
74+
err := internal.ConvertLogs(ld, encoder)
75+
if err != nil {
76+
return consumererror.NewPermanent(err)
77+
}
78+
79+
if buffer.Len() > 0 {
80+
return e.export(ctx, e.config.Logs.Datasource, buffer)
81+
}
82+
return nil
83+
}
84+
85+
func (e *tinybirdExporter) export(ctx context.Context, dataSource string, buffer *bytes.Buffer) error {
86+
// Create request and add query parameters
87+
url := e.config.ClientConfig.Endpoint + "/v0/events"
88+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, buffer)
89+
if err != nil {
90+
return consumererror.NewPermanent(err)
91+
}
92+
q := req.URL.Query()
93+
q.Set("name", dataSource)
94+
if e.config.Wait {
95+
q.Set("wait", "true")
96+
}
97+
req.URL.RawQuery = q.Encode()
98+
99+
// Set headers
100+
req.Header.Set("Content-Type", contentTypeNDJSON)
101+
req.Header.Set("Authorization", "Bearer "+string(e.config.Token))
102+
req.Header.Set("User-Agent", e.userAgent)
103+
104+
// Send request
105+
resp, err := e.client.Do(req)
106+
if err != nil {
107+
return err
108+
}
109+
defer func() {
110+
// Drain the response body to avoid leaking resources.
111+
_, _ = io.Copy(io.Discard, resp.Body)
112+
resp.Body.Close()
113+
}()
114+
115+
// Check if the request was successful.
116+
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
117+
return nil
118+
}
119+
120+
// Read error response
121+
respBody, err := io.ReadAll(resp.Body)
122+
if err != nil {
123+
return fmt.Errorf("failed to read response body: %w", err)
124+
}
125+
formattedErr := fmt.Errorf("error exporting items, request to %s responded with HTTP Status Code %d, Message=%s",
126+
url, resp.StatusCode, string(respBody))
127+
128+
// If the status code is not retryable, return a permanent error.
129+
if !isRetryableStatusCode(resp.StatusCode) {
130+
return consumererror.NewPermanent(formattedErr)
131+
}
132+
133+
// Check if the server is overwhelmed.
134+
isThrottleError := resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable
135+
if isThrottleError {
136+
values := resp.Header.Values(headerRetryAfter)
137+
if len(values) == 0 {
138+
return formattedErr
139+
}
140+
// The value of Retry-After field can be either an HTTP-date or a number of
141+
// seconds to delay after the response is received. See https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3
142+
//
143+
// Tinybird Events API returns the delay-seconds in the Retry-After header.
144+
// https://www.tinybird.co/docs/forward/get-data-in/events-api#rate-limit-headers
145+
if seconds, err := strconv.Atoi(values[0]); err == nil {
146+
return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(seconds)*time.Second)
147+
}
148+
}
149+
150+
return formattedErr
151+
}
152+
153+
// Determine if the status code is retryable according to Tinybird Events API.
154+
// See https://www.tinybird.co/docs/api-reference/events-api#return-http-status-codes
155+
func isRetryableStatusCode(code int) bool {
156+
switch code {
157+
case http.StatusTooManyRequests:
158+
return true
159+
case http.StatusInternalServerError:
160+
return true
161+
case http.StatusBadGateway:
162+
return true
163+
case http.StatusServiceUnavailable:
164+
return true
165+
case http.StatusGatewayTimeout:
166+
return true
167+
default:
168+
return false
169+
}
37170
}

0 commit comments

Comments
 (0)