Skip to content

Commit 98b1fb5

Browse files
committed
receiver/sqlquery: report component status on failures
1 parent 76572fc commit 98b1fb5

7 files changed

Lines changed: 183 additions & 6 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
change_type: enhancement
2+
component: receiver/sqlquery
3+
note: Report component status on database connection and query failures for health check v2 integration.
4+
issues: [43837]
5+
subtext:
6+
change_logs: [user]

receiver/sqlqueryreceiver/go.mod

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ require (
4040
go.uber.org/zap v1.27.1
4141
)
4242

43+
require (
44+
go.opentelemetry.io/collector/component/componentstatus v0.149.1-0.20260408002112-999af6320692
45+
go.opentelemetry.io/collector/scraper v0.149.1-0.20260408002112-999af6320692
46+
)
47+
4348
require (
4449
dario.cat/mergo v1.0.2 // indirect
4550
filippo.io/edwards25519 v1.1.1 // indirect
@@ -165,7 +170,6 @@ require (
165170
go.opentelemetry.io/collector/pipeline v1.55.1-0.20260408002112-999af6320692 // indirect
166171
go.opentelemetry.io/collector/pipeline/xpipeline v0.149.1-0.20260408002112-999af6320692 // indirect
167172
go.opentelemetry.io/collector/receiver/xreceiver v0.149.1-0.20260408002112-999af6320692 // indirect
168-
go.opentelemetry.io/collector/scraper v0.149.1-0.20260408002112-999af6320692 // indirect
169173
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
170174
go.opentelemetry.io/otel v1.43.0 // indirect
171175
go.opentelemetry.io/otel/sdk v1.43.0 // indirect

receiver/sqlqueryreceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/sqlqueryreceiver/logs_receiver.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/component/componentstatus"
1415
"go.opentelemetry.io/collector/consumer"
1516
"go.opentelemetry.io/collector/extension/xextension/storage"
1617
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -38,6 +39,7 @@ type logsReceiver struct {
3839
id component.ID
3940
storageClient storage.Client
4041
obsrecv *receiverhelper.ObsReport
42+
host component.Host
4143
}
4244

4345
func newLogsReceiver(
@@ -88,6 +90,7 @@ func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) er
8890
}
8991
receiver.settings.Logger.Debug("starting...")
9092
receiver.isStarted = true
93+
receiver.host = host
9194

9295
var err error
9396
receiver.storageClient, err = adapter.GetStorageClient(ctx, host, receiver.config.StorageID, receiver.settings.ID)
@@ -162,21 +165,39 @@ func (receiver *logsReceiver) startCollecting() {
162165
}
163166

164167
func (receiver *logsReceiver) collect() {
165-
logsChannel := make(chan plog.Logs)
168+
type collectResult struct {
169+
logs plog.Logs
170+
err error
171+
}
172+
resultsChannel := make(chan collectResult, len(receiver.queryReceivers))
166173
for _, queryReceiver := range receiver.queryReceivers {
167174
go func(queryReceiver *logsQueryReceiver) {
168175
logs, err := queryReceiver.collect(context.Background())
169176
if err != nil {
170177
receiver.settings.Logger.Error("error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID()))
171178
}
172-
logsChannel <- logs
179+
resultsChannel <- collectResult{logs: logs, err: err}
173180
}(queryReceiver)
174181
}
175182

176183
allLogs := plog.NewLogs()
184+
var collectErr error
177185
for range receiver.queryReceivers {
178-
logs := <-logsChannel
179-
logs.ResourceLogs().MoveAndAppendTo(allLogs.ResourceLogs())
186+
select {
187+
case result := <-resultsChannel:
188+
result.logs.ResourceLogs().MoveAndAppendTo(allLogs.ResourceLogs())
189+
if result.err != nil {
190+
collectErr = errors.Join(collectErr, result.err)
191+
}
192+
case <-receiver.shutdownRequested:
193+
return
194+
}
195+
}
196+
197+
if collectErr != nil {
198+
componentstatus.ReportStatus(receiver.host, componentstatus.NewRecoverableErrorEvent(collectErr))
199+
} else {
200+
componentstatus.ReportStatus(receiver.host, componentstatus.NewEvent(componentstatus.StatusOK))
180201
}
181202

182203
logRecordCount := allLogs.LogRecordCount()

receiver/sqlqueryreceiver/logs_receiver_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/component/componentstatus"
1213
"go.opentelemetry.io/collector/component/componenttest"
1314
"go.opentelemetry.io/collector/consumer/consumertest"
1415
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -212,3 +213,55 @@ func TestLogsReceiver_InitialDelay(t *testing.T) {
212213
return sink.LogRecordCount() >= 1
213214
}, initialDelay+50*time.Millisecond, 5*time.Millisecond)
214215
}
216+
217+
func TestStatusReportingLogs(t *testing.T) {
218+
fakeClient := &sqlquery.FakeDBClient{
219+
StringMaps: [][]sqlquery.StringMap{
220+
{{"col1": "42"}},
221+
},
222+
}
223+
createReceiver := createLogsReceiverFunc(fakeDBConnect, func(sqlquery.Db, string, *zap.Logger, sqlquery.TelemetryConfig) sqlquery.DbClient {
224+
return fakeClient
225+
})
226+
227+
ctx := t.Context()
228+
statusEvents := make(chan *componentstatus.Event, 10)
229+
host := &statusReporterHost{
230+
Host: componenttest.NewNopHost(),
231+
report: func(event *componentstatus.Event) {
232+
statusEvents <- event
233+
},
234+
}
235+
236+
receiver, err := createReceiver(
237+
ctx,
238+
receivertest.NewNopSettings(metadata.Type),
239+
&Config{
240+
Config: sqlquery.Config{
241+
ControllerConfig: scraperhelper.ControllerConfig{
242+
CollectionInterval: 10 * time.Millisecond,
243+
},
244+
Driver: "postgres",
245+
DataSource: "my-datasource",
246+
Queries: []sqlquery.Query{{
247+
SQL: "select * from foo",
248+
Logs: []sqlquery.LogsCfg{{
249+
BodyColumn: "col1",
250+
}},
251+
}},
252+
},
253+
},
254+
consumertest.NewNop(),
255+
)
256+
require.NoError(t, err)
257+
require.NoError(t, receiver.Start(ctx, host))
258+
259+
select {
260+
case event := <-statusEvents:
261+
require.Equal(t, componentstatus.StatusOK, event.Status())
262+
case <-time.After(1 * time.Second):
263+
t.Fatal("timed out waiting for status event")
264+
}
265+
266+
require.NoError(t, receiver.Shutdown(ctx))
267+
}

receiver/sqlqueryreceiver/receiver.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ import (
88
"fmt"
99

1010
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/component/componentstatus"
1112
"go.opentelemetry.io/collector/consumer"
1213
"go.opentelemetry.io/collector/pdata/pcommon"
14+
"go.opentelemetry.io/collector/pdata/pmetric"
1315
"go.opentelemetry.io/collector/receiver"
16+
"go.opentelemetry.io/collector/scraper"
1417
"go.opentelemetry.io/collector/scraper/scraperhelper"
1518

1619
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
@@ -62,7 +65,10 @@ func createMetricsReceiverFunc(sqlOpenerFunc sqlquery.SQLOpenerFunc, clientProvi
6265
scope.SetName(metadata.ScopeName)
6366
mp := sqlquery.NewScraper(id, query, sqlCfg.ControllerConfig, settings.Logger, sqlCfg.Telemetry, pool.DB, clientProviderFunc, scope)
6467

65-
opt := scraperhelper.AddMetricsScraper(metadata.Type, mp)
68+
wrapped := &statusReportingScraper{
69+
delegate: mp,
70+
}
71+
opt := scraperhelper.AddMetricsScraper(metadata.Type, wrapped)
6672
opts = append(opts, opt)
6773
}
6874

@@ -74,3 +80,29 @@ func createMetricsReceiverFunc(sqlOpenerFunc sqlquery.SQLOpenerFunc, clientProvi
7480
)
7581
}
7682
}
83+
84+
type statusReportingScraper struct {
85+
delegate *sqlquery.Scraper
86+
host component.Host
87+
}
88+
89+
func (s *statusReportingScraper) Start(ctx context.Context, host component.Host) error {
90+
s.host = host
91+
return s.delegate.Start(ctx, host)
92+
}
93+
94+
func (s *statusReportingScraper) ScrapeMetrics(ctx context.Context) (pmetric.Metrics, error) {
95+
metrics, err := s.delegate.ScrapeMetrics(ctx)
96+
if err != nil {
97+
componentstatus.ReportStatus(s.host, componentstatus.NewRecoverableErrorEvent(err))
98+
} else {
99+
componentstatus.ReportStatus(s.host, componentstatus.NewEvent(componentstatus.StatusOK))
100+
}
101+
return metrics, err
102+
}
103+
104+
func (s *statusReportingScraper) Shutdown(ctx context.Context) error {
105+
return s.delegate.Shutdown(ctx)
106+
}
107+
108+
var _ scraper.Metrics = (*statusReportingScraper)(nil)

receiver/sqlqueryreceiver/receiver_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"time"
1010

1111
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/component/componentstatus"
1214
"go.opentelemetry.io/collector/component/componenttest"
1315
"go.opentelemetry.io/collector/consumer/consumertest"
1416
"go.opentelemetry.io/collector/receiver/receivertest"
@@ -183,6 +185,63 @@ func TestCreateMetricsBothDatasourceFields(t *testing.T) {
183185
require.NoError(t, receiver.Shutdown(ctx))
184186
}
185187

188+
func TestStatusReportingMetrics(t *testing.T) {
189+
createReceiver := createMetricsReceiverFunc(fakeDBConnect, mkFakeClient)
190+
ctx := t.Context()
191+
192+
statusEvents := make(chan *componentstatus.Event, 10)
193+
host := &statusReporterHost{
194+
Host: componenttest.NewNopHost(),
195+
report: func(event *componentstatus.Event) {
196+
statusEvents <- event
197+
},
198+
}
199+
200+
receiver, err := createReceiver(
201+
ctx,
202+
receivertest.NewNopSettings(metadata.Type),
203+
&Config{
204+
Config: sqlquery.Config{
205+
ControllerConfig: scraperhelper.ControllerConfig{
206+
CollectionInterval: 10 * time.Millisecond,
207+
},
208+
Driver: "postgres",
209+
DataSource: "my-datasource",
210+
Queries: []sqlquery.Query{{
211+
SQL: "select * from foo",
212+
Metrics: []sqlquery.MetricCfg{{
213+
MetricName: "my-metric",
214+
ValueColumn: "foo",
215+
}},
216+
}},
217+
},
218+
},
219+
consumertest.NewNop(),
220+
)
221+
require.NoError(t, err)
222+
require.NoError(t, receiver.Start(ctx, host))
223+
224+
select {
225+
case event := <-statusEvents:
226+
require.Equal(t, componentstatus.StatusOK, event.Status())
227+
case <-time.After(1 * time.Second):
228+
t.Fatal("timed out waiting for status event")
229+
}
230+
231+
require.NoError(t, receiver.Shutdown(ctx))
232+
}
233+
234+
type statusReporterHost struct {
235+
component.Host
236+
report func(*componentstatus.Event)
237+
}
238+
239+
func (h *statusReporterHost) Report(event *componentstatus.Event) {
240+
if h.report != nil {
241+
h.report(event)
242+
}
243+
}
244+
186245
func fakeDBConnect(string, string) (*sql.DB, error) {
187246
return nil, nil
188247
}

0 commit comments

Comments
 (0)