Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .chloggen/sqlquery-component-status.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
change_type: enhancement
component: receiver/sqlquery
note: Report component status on database connection and query failures for health check v2 integration.
issues: [43837]
subtext:
change_logs: [user]
6 changes: 5 additions & 1 deletion receiver/sqlqueryreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ require (
go.uber.org/zap v1.27.1
)

require (
go.opentelemetry.io/collector/component/componentstatus v0.150.0
go.opentelemetry.io/collector/scraper v0.150.0
)

require (
dario.cat/mergo v1.0.2 // indirect
filippo.io/edwards25519 v1.1.1 // indirect
Expand Down Expand Up @@ -165,7 +170,6 @@ require (
go.opentelemetry.io/collector/pipeline v1.56.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.150.0 // indirect
go.opentelemetry.io/collector/receiver/xreceiver v0.150.0 // indirect
go.opentelemetry.io/collector/scraper v0.150.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
go.opentelemetry.io/otel v1.43.0 // indirect
go.opentelemetry.io/otel/sdk v1.43.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions receiver/sqlqueryreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 25 additions & 4 deletions receiver/sqlqueryreceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension/xextension/storage"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -38,6 +39,7 @@ type logsReceiver struct {
id component.ID
storageClient storage.Client
obsrecv *receiverhelper.ObsReport
host component.Host
}

func newLogsReceiver(
Expand Down Expand Up @@ -88,6 +90,7 @@ func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) er
}
receiver.settings.Logger.Debug("starting...")
receiver.isStarted = true
receiver.host = host

var err error
receiver.storageClient, err = adapter.GetStorageClient(ctx, host, receiver.config.StorageID, receiver.settings.ID)
Expand Down Expand Up @@ -162,21 +165,39 @@ func (receiver *logsReceiver) startCollecting() {
}

func (receiver *logsReceiver) collect() {
logsChannel := make(chan plog.Logs)
type collectResult struct {
logs plog.Logs
err error
}
resultsChannel := make(chan collectResult, len(receiver.queryReceivers))
for _, queryReceiver := range receiver.queryReceivers {
go func(queryReceiver *logsQueryReceiver) {
logs, err := queryReceiver.collect(context.Background())
if err != nil {
receiver.settings.Logger.Error("error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID()))
}
logsChannel <- logs
resultsChannel <- collectResult{logs: logs, err: err}
}(queryReceiver)
}

allLogs := plog.NewLogs()
var collectErr error
for range receiver.queryReceivers {
logs := <-logsChannel
logs.ResourceLogs().MoveAndAppendTo(allLogs.ResourceLogs())
select {
case result := <-resultsChannel:
result.logs.ResourceLogs().MoveAndAppendTo(allLogs.ResourceLogs())
if result.err != nil {
collectErr = errors.Join(collectErr, result.err)
}
case <-receiver.shutdownRequested:
return
}
}

if collectErr != nil {
componentstatus.ReportStatus(receiver.host, componentstatus.NewRecoverableErrorEvent(collectErr))
} else {
componentstatus.ReportStatus(receiver.host, componentstatus.NewEvent(componentstatus.StatusOK))
}

logRecordCount := allLogs.LogRecordCount()
Expand Down
53 changes: 53 additions & 0 deletions receiver/sqlqueryreceiver/logs_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -212,3 +213,55 @@ func TestLogsReceiver_InitialDelay(t *testing.T) {
return sink.LogRecordCount() >= 1
}, initialDelay+50*time.Millisecond, 5*time.Millisecond)
}

func TestStatusReportingLogs(t *testing.T) {
fakeClient := &sqlquery.FakeDBClient{
StringMaps: [][]sqlquery.StringMap{
{{"col1": "42"}},
},
}
createReceiver := createLogsReceiverFunc(fakeDBConnect, func(sqlquery.Db, string, *zap.Logger, sqlquery.TelemetryConfig) sqlquery.DbClient {
return fakeClient
})

ctx := t.Context()
statusEvents := make(chan *componentstatus.Event, 10)
host := &statusReporterHost{
Host: componenttest.NewNopHost(),
report: func(event *componentstatus.Event) {
statusEvents <- event
},
}

receiver, err := createReceiver(
ctx,
receivertest.NewNopSettings(metadata.Type),
&Config{
Config: sqlquery.Config{
ControllerConfig: scraperhelper.ControllerConfig{
CollectionInterval: 10 * time.Millisecond,
},
Driver: "postgres",
DataSource: "my-datasource",
Queries: []sqlquery.Query{{
SQL: "select * from foo",
Logs: []sqlquery.LogsCfg{{
BodyColumn: "col1",
}},
}},
},
},
consumertest.NewNop(),
)
require.NoError(t, err)
require.NoError(t, receiver.Start(ctx, host))

select {
case event := <-statusEvents:
require.Equal(t, componentstatus.StatusOK, event.Status())
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for status event")
}

require.NoError(t, receiver.Shutdown(ctx))
}
34 changes: 33 additions & 1 deletion receiver/sqlqueryreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/scraper"
"go.opentelemetry.io/collector/scraper/scraperhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
Expand Down Expand Up @@ -62,7 +65,10 @@ func createMetricsReceiverFunc(sqlOpenerFunc sqlquery.SQLOpenerFunc, clientProvi
scope.SetName(metadata.ScopeName)
mp := sqlquery.NewScraper(id, query, sqlCfg.ControllerConfig, settings.Logger, sqlCfg.Telemetry, pool.DB, clientProviderFunc, scope)

opt := scraperhelper.AddMetricsScraper(metadata.Type, mp)
wrapped := &statusReportingScraper{
delegate: mp,
}
opt := scraperhelper.AddMetricsScraper(metadata.Type, wrapped)
opts = append(opts, opt)
}

Expand All @@ -74,3 +80,29 @@ func createMetricsReceiverFunc(sqlOpenerFunc sqlquery.SQLOpenerFunc, clientProvi
)
}
}

type statusReportingScraper struct {
delegate *sqlquery.Scraper
host component.Host
}

func (s *statusReportingScraper) Start(ctx context.Context, host component.Host) error {
s.host = host
return s.delegate.Start(ctx, host)
}

func (s *statusReportingScraper) ScrapeMetrics(ctx context.Context) (pmetric.Metrics, error) {
metrics, err := s.delegate.ScrapeMetrics(ctx)
if err != nil {
componentstatus.ReportStatus(s.host, componentstatus.NewRecoverableErrorEvent(err))
} else {
componentstatus.ReportStatus(s.host, componentstatus.NewEvent(componentstatus.StatusOK))
}
return metrics, err
}

func (s *statusReportingScraper) Shutdown(ctx context.Context) error {
return s.delegate.Shutdown(ctx)
}

var _ scraper.Metrics = (*statusReportingScraper)(nil)
59 changes: 59 additions & 0 deletions receiver/sqlqueryreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand Down Expand Up @@ -183,6 +185,63 @@ func TestCreateMetricsBothDatasourceFields(t *testing.T) {
require.NoError(t, receiver.Shutdown(ctx))
}

func TestStatusReportingMetrics(t *testing.T) {
createReceiver := createMetricsReceiverFunc(fakeDBConnect, mkFakeClient)
ctx := t.Context()

statusEvents := make(chan *componentstatus.Event, 10)
host := &statusReporterHost{
Host: componenttest.NewNopHost(),
report: func(event *componentstatus.Event) {
statusEvents <- event
},
}

receiver, err := createReceiver(
ctx,
receivertest.NewNopSettings(metadata.Type),
&Config{
Config: sqlquery.Config{
ControllerConfig: scraperhelper.ControllerConfig{
CollectionInterval: 10 * time.Millisecond,
},
Driver: "postgres",
DataSource: "my-datasource",
Queries: []sqlquery.Query{{
SQL: "select * from foo",
Metrics: []sqlquery.MetricCfg{{
MetricName: "my-metric",
ValueColumn: "foo",
}},
}},
},
},
consumertest.NewNop(),
)
require.NoError(t, err)
require.NoError(t, receiver.Start(ctx, host))

select {
case event := <-statusEvents:
require.Equal(t, componentstatus.StatusOK, event.Status())
case <-time.After(1 * time.Second):
t.Fatal("timed out waiting for status event")
}

require.NoError(t, receiver.Shutdown(ctx))
}

type statusReporterHost struct {
component.Host
report func(*componentstatus.Event)
}

func (h *statusReporterHost) Report(event *componentstatus.Event) {
if h.report != nil {
h.report(event)
}
}

func fakeDBConnect(string, string) (*sql.DB, error) {
return nil, nil
}
Expand Down