Skip to content

Commit 018fda1

Browse files
committed
fix(database_observability.postgres): cleanup embedded exporter collectors on reconnection
This is a port of #6021
1 parent b0105cb commit 018fda1

File tree

2 files changed

+126
-52
lines changed

2 files changed

+126
-52
lines changed

internal/component/database_observability/postgres/component.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ func (c *Component) Run(ctx context.Context) error {
291291
for _, collector := range c.collectors {
292292
collector.Stop()
293293
}
294+
c.cleanupExporterCollectors()
295+
294296
if c.dbConnection != nil {
295297
c.dbConnection.Close()
296298
}
@@ -367,11 +369,24 @@ func (c *Component) tryReconnect(ctx context.Context) error {
367369
return nil
368370
}
369371

372+
// cleanupExporterCollectors releases resources held by embedded exporter collectors.
373+
// Callers must hold c.mut.
374+
func (c *Component) cleanupExporterCollectors() {
375+
for _, col := range c.exporterCollectors {
376+
if closable, ok := col.(interface{ CloseServers() }); ok {
377+
closable.CloseServers()
378+
}
379+
c.registry.Unregister(col)
380+
}
381+
c.exporterCollectors = nil
382+
}
383+
370384
func (c *Component) connectAndStartCollectors(ctx context.Context) error {
371385
if c.dbConnection != nil {
372386
c.dbConnection.Close()
373387
c.dbConnection = nil
374388
}
389+
c.cleanupExporterCollectors()
375390

376391
dbConnection, err := c.openSQL("postgres", string(c.args.DataSourceName))
377392
if err != nil {
@@ -415,11 +430,6 @@ func (c *Component) connectAndStartCollectors(ctx context.Context) error {
415430
cp = cloudProvider
416431
}
417432

418-
for _, col := range c.exporterCollectors {
419-
c.registry.Unregister(col)
420-
}
421-
c.exporterCollectors = nil
422-
423433
if len(c.args.Targets) == 0 {
424434
if c.args.PrometheusExporter == nil {
425435
d := PrometheusExporterArguments(exporter_postgres.DefaultArguments)

internal/component/database_observability/postgres/component_test.go

Lines changed: 111 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,43 @@ import (
2626
"github.com/grafana/loki/pkg/push"
2727
)
2828

29+
func newTestComponent(t *testing.T, openSQL func(string, string) (*sql.DB, error)) *Component {
30+
t.Helper()
31+
opts := cmp.Options{
32+
ID: "test",
33+
Logger: kitlog.NewNopLogger(),
34+
OnStateChange: func(e cmp.Exports) {},
35+
GetServiceData: func(name string) (any, error) {
36+
return http_service.Data{MemoryListenAddr: "127.0.0.1:0", BaseHTTPPath: "/"}, nil
37+
},
38+
}
39+
args := Arguments{
40+
DataSourceName: alloytypes.Secret("postgres://user:pass@127.0.0.1:5432/db?sslmode=disable"),
41+
ForwardTo: []loki.LogsReceiver{},
42+
Targets: []discovery.Target{},
43+
DisableCollectors: []string{"query_details", "schema_details", "query_samples", "explain_plans"},
44+
HealthCheckArguments: HealthCheckArguments{
45+
CollectInterval: 1 * time.Hour,
46+
},
47+
}
48+
c := &Component{
49+
opts: opts,
50+
args: args,
51+
fanout: loki.NewFanout(args.ForwardTo),
52+
handler: loki.NewLogsReceiver(),
53+
registry: prometheus.NewRegistry(),
54+
healthErr: atomic.NewString(""),
55+
openSQL: openSQL,
56+
logsReceiver: loki.NewLogsReceiver(),
57+
}
58+
c.instanceKey = "test-instance"
59+
c.baseTarget = discovery.NewTargetFromMap(map[string]string{
60+
"instance": c.instanceKey,
61+
"job": "database_observability",
62+
})
63+
return c
64+
}
65+
2966
func Test_enableOrDisableCollectors(t *testing.T) {
3067
t.Run("nothing specified (default behavior)", func(t *testing.T) {
3168
exampleDBO11yAlloyConfig := `
@@ -643,6 +680,62 @@ func Test_connectAndStartCollectors(t *testing.T) {
643680
})
644681
}
645682

683+
type fakeClosableCollector struct {
684+
prometheus.Collector
685+
closeCalls int
686+
}
687+
688+
func newFakeClosableCollector(name string) *fakeClosableCollector {
689+
gauge := prometheus.NewGauge(prometheus.GaugeOpts{
690+
Name: name,
691+
Help: name,
692+
})
693+
return &fakeClosableCollector{Collector: gauge}
694+
}
695+
696+
func (c *fakeClosableCollector) CloseServers() {
697+
c.closeCalls++
698+
}
699+
700+
func TestComponent_cleanupExporterCollectors(t *testing.T) {
701+
t.Run("closes closable exporters and unregisters them", func(t *testing.T) {
702+
registry := prometheus.NewRegistry()
703+
collector := newFakeClosableCollector("test_cleanup_exporter_collectors_closable")
704+
require.NoError(t, registry.Register(collector))
705+
706+
c := &Component{
707+
registry: registry,
708+
exporterCollectors: []prometheus.Collector{collector},
709+
}
710+
711+
c.cleanupExporterCollectors()
712+
713+
assert.Equal(t, 1, collector.closeCalls)
714+
assert.Nil(t, c.exporterCollectors)
715+
assert.False(t, registry.Unregister(collector))
716+
})
717+
718+
t.Run("unregisters non-closable collectors without panicking", func(t *testing.T) {
719+
registry := prometheus.NewRegistry()
720+
collector := prometheus.NewGauge(prometheus.GaugeOpts{
721+
Name: "test_cleanup_exporter_collectors_plain",
722+
Help: "test",
723+
})
724+
collector.Set(1)
725+
require.NoError(t, registry.Register(collector))
726+
727+
c := &Component{
728+
registry: registry,
729+
exporterCollectors: []prometheus.Collector{collector},
730+
}
731+
732+
c.cleanupExporterCollectors()
733+
734+
assert.Nil(t, c.exporterCollectors)
735+
assert.False(t, registry.Unregister(collector))
736+
})
737+
}
738+
646739
func TestPostgres_Reconnection(t *testing.T) {
647740
t.Run("tryReconnect fails and maintains health error", func(t *testing.T) {
648741
opts := cmp.Options{
@@ -671,47 +764,14 @@ func TestPostgres_Reconnection(t *testing.T) {
671764
})
672765

673766
t.Run("tryReconnect succeeds and clears health error", func(t *testing.T) {
674-
opts := cmp.Options{
675-
ID: "test",
676-
Logger: kitlog.NewNopLogger(),
677-
OnStateChange: func(e cmp.Exports) {},
678-
GetServiceData: func(name string) (any, error) {
679-
return http_service.Data{MemoryListenAddr: "127.0.0.1:0", BaseHTTPPath: "/"}, nil
680-
},
681-
}
682-
683-
args := Arguments{
684-
DataSourceName: alloytypes.Secret("postgres://user:pass@127.0.0.1:5432/db?sslmode=disable"),
685-
ForwardTo: []loki.LogsReceiver{},
686-
Targets: []discovery.Target{},
687-
DisableCollectors: []string{"query_details", "schema_details", "query_samples", "explain_plans"},
688-
HealthCheckArguments: HealthCheckArguments{
689-
CollectInterval: 1 * time.Hour,
690-
},
691-
}
692-
693767
// First mock: will fail
694768
db1, mock1, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
695769
require.NoError(t, err)
696770
defer db1.Close()
697771

698772
mock1.ExpectPing().WillReturnError(assert.AnError)
699773

700-
c := &Component{
701-
opts: opts,
702-
args: args,
703-
fanout: loki.NewFanout(args.ForwardTo),
704-
handler: loki.NewLogsReceiver(),
705-
registry: prometheus.NewRegistry(),
706-
healthErr: atomic.NewString(""),
707-
openSQL: func(_ string, _ string) (*sql.DB, error) { return db1, nil },
708-
logsReceiver: loki.NewLogsReceiver(),
709-
}
710-
c.instanceKey = "test-instance"
711-
c.baseTarget = discovery.NewTargetFromMap(map[string]string{
712-
"instance": c.instanceKey,
713-
"job": "database_observability",
714-
})
774+
c := newTestComponent(t, func(_, _ string) (*sql.DB, error) { return db1, nil })
715775

716776
// First attempt: connection fails
717777
err = c.tryReconnect(context.Background())
@@ -722,12 +782,10 @@ func TestPostgres_Reconnection(t *testing.T) {
722782
db2, mock2, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
723783
require.NoError(t, err)
724784
defer db2.Close()
725-
726785
mock2.ExpectPing()
727786
mock2.ExpectQuery(`SELECT.*system_identifier.*inet_server_addr.*inet_server_port.*version`).
728787
WillReturnRows(sqlmock.NewRows([]string{"system_identifier", "inet_server_addr", "inet_server_port", "version"}).
729788
AddRow("1234567890", "127.0.0.1", "5432", "14.0"))
730-
731789
c.openSQL = func(_ string, _ string) (*sql.DB, error) { return db2, nil }
732790

733791
// Second attempt: connection succeeds and clears error
@@ -760,21 +818,27 @@ func TestPostgres_Reconnection(t *testing.T) {
760818
require.NoError(t, err)
761819

762820
ctx, cancel := context.WithCancel(context.Background())
821+
cancel()
763822

764-
runErr := make(chan error, 1)
765-
go func() {
766-
runErr <- c.Run(ctx)
767-
}()
823+
err = c.Run(ctx)
824+
assert.NoError(t, err)
825+
})
826+
827+
t.Run("Run cleans up embedded exporter collectors on shutdown", func(t *testing.T) {
828+
c := newTestComponent(t, func(_, _ string) (*sql.DB, error) { return nil, assert.AnError })
829+
oldCollector := newFakeClosableCollector("test_run_cleanup_old_exporter")
830+
require.NoError(t, c.registry.Register(oldCollector))
831+
c.exporterCollectors = []prometheus.Collector{oldCollector}
768832

769-
time.Sleep(100 * time.Millisecond)
833+
ctx, cancel := context.WithCancel(context.Background())
770834
cancel()
771835

772-
select {
773-
case err := <-runErr:
774-
assert.NoError(t, err)
775-
case <-time.After(5 * time.Second):
776-
t.Fatal("Run did not exit after context cancellation")
777-
}
836+
err := c.Run(ctx)
837+
assert.NoError(t, err)
838+
839+
assert.Equal(t, 1, oldCollector.closeCalls)
840+
assert.Nil(t, c.exporterCollectors)
841+
assert.False(t, c.registry.Unregister(oldCollector))
778842
})
779843
}
780844

0 commit comments

Comments
 (0)