Skip to content

Commit fcb3d3e

Browse files
feat(database_observability.postgres): Throttle schema details create_statement logs (grafana#6260)
### Brief description of Pull Request Refactor the collector to log at most one OP_CREATE_STATEMENT per table every 30m, while still checking for new objects at every collect interval tick. Remove the internal cache (settings are ignored/deprecated) and replace it with an in-memory map to throttle logging of OP_CREATE_STATEMENT to at most once per 30 minutes per database/schema/table. ### Pull Request Details <!-- Detailed descripion of the Pull Request, if needed. --> ### Issue(s) fixed by this Pull Request Followup of grafana#6239 Relates to grafana/grafana-dbo11y-app#2694 ### Notes to the Reviewer <!-- Relevant notes for reviewers/testers. --> ### PR Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] Documentation added - [ ] Tests updated - [ ] Config converters updated
1 parent 94664d3 commit fcb3d3e

7 files changed

Lines changed: 502 additions & 308 deletions

File tree

docs/sources/reference/components/database_observability/database_observability.postgres.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,14 @@ The `gcp` block supplies the identifying information for the GCP Cloud SQL datab
145145

146146
### `schema_details`
147147

148-
| Name | Type | Description | Default | Required |
149-
|--------------------|------------|-----------------------------------------------------------------------|---------|----------|
150-
| `collect_interval` | `duration` | How frequently to collect information from database. | `"1m"` | no |
151-
| `cache_enabled` | `boolean` | Whether to enable caching of table definitions. | `true` | no |
152-
| `cache_size` | `integer` | Cache size. | `256` | no |
153-
| `cache_ttl` | `duration` | Cache TTL. | `"10m"` | no |
148+
| Name | Type | Description | Default | Required |
149+
|--------------------|------------|-------------------------------------------------------------|---------|----------|
150+
| `collect_interval` | `duration` | How frequently to collect information from database. | `"1m"` | no |
151+
| `cache_enabled` | `boolean` | Deprecated. Whether to enable caching of table definitions. | `true` | no |
152+
| `cache_size` | `integer` | Deprecated. Cache size. | `256` | no |
153+
| `cache_ttl` | `duration` | Deprecated. Cache TTL. | `"10m"` | no |
154+
155+
The `cache_enabled`, `cache_size`, and `cache_ttl` settings are deprecated: they are accepted for backward compatibility, but ignored.
154156

155157
### `explain_plans`
156158

integration-tests/docker/tests/database-observability-postgres/postgres_test.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ package main
55
import (
66
"testing"
77

8-
"github.com/grafana/alloy/integration-tests/docker/common"
9-
"github.com/grafana/alloy/integration-tests/internal/lokihttp"
108
"github.com/stretchr/testify/assert"
119
"github.com/stretchr/testify/require"
10+
11+
"github.com/grafana/alloy/integration-tests/docker/common"
12+
"github.com/grafana/alloy/integration-tests/internal/lokihttp"
1213
)
1314

1415
const testName = "database_observability_postgres"
@@ -31,21 +32,16 @@ func TestDatabaseObservabilityPostgresLogs(t *testing.T) {
3132
"create_statement",
3233
}
3334

34-
var logResponse lokihttp.LogResponse
35-
3635
require.EventuallyWithT(t, func(c *assert.CollectT) {
37-
_, err := common.FetchDataFromURL(common.LogQuery(testName, 100), &logResponse)
38-
assert.NoError(c, err)
39-
40-
ops := make(map[string]bool)
41-
for _, result := range logResponse.Data.Result {
42-
if op, ok := result.Stream["op"]; ok {
43-
ops[op] = true
44-
}
45-
}
46-
4736
for _, op := range expectedOps {
48-
assert.True(c, ops[op], "expected %s logs", op)
37+
var resp lokihttp.LogResponse
38+
_, err := common.FetchDataFromURL(
39+
// fetch one log with exact match for op label
40+
common.LogQuery(testName, 1, common.LabelMatcher{Name: "op", Value: op}),
41+
&resp,
42+
)
43+
assert.NoError(c, err)
44+
assert.NotEmpty(c, resp.Data.Result, "expected at least one log with op=%s", op)
4945
}
5046
}, common.TestTimeoutEnv(t), common.DefaultRetryInterval)
5147
}

internal/component/database_observability/mysql/collector/schema_details_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ func TestSchemaDetails(t *testing.T) {
525525
AddRow("some_schema.some_table", "CREATE TABLE some_table (id INT)"))
526526

527527
// Second scrape: only the tables list. The scrape is throttled (still
528-
// within EmitInterval) so it must not trigger any create-statement
528+
// within emitInterval) so it must not trigger any create-statement
529529
// related queries.
530530
mock.ExpectQuery(fmt.Sprintf(selectTablesTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed().
531531
WillReturnRows(sqlmock.NewRows([]string{
@@ -537,7 +537,7 @@ func TestSchemaDetails(t *testing.T) {
537537
))
538538

539539
require.NoError(t, collector.extractSchema(t.Context()))
540-
fakeNow = fakeNow.Add(time.Minute) // well within EmitInterval
540+
fakeNow = fakeNow.Add(time.Minute) // well within emitInterval
541541
require.NoError(t, collector.extractSchema(t.Context()))
542542

543543
// First scrape emits OP_TABLE_DETECTION + OP_CREATE_STATEMENT; second
@@ -710,7 +710,7 @@ func TestSchemaDetails(t *testing.T) {
710710

711711
// Second scrape: only table_a remains. table_b should be evicted from
712712
// the throttle map by housekeeping. Since table_a was already emitted
713-
// less than EmitInterval ago, no further fetch queries are expected.
713+
// less than emitInterval ago, no further fetch queries are expected.
714714
fakeNow = fakeNow.Add(time.Minute)
715715
mock.ExpectQuery(fmt.Sprintf(selectTablesTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed().
716716
WillReturnRows(sqlmock.NewRows([]string{

internal/component/database_observability/postgres/collector/schema_details.go

Lines changed: 120 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"sync"
1212
"time"
1313

14-
"github.com/hashicorp/golang-lru/v2/expirable"
1514
"github.com/lib/pq"
1615
"go.uber.org/atomic"
1716

@@ -26,6 +25,11 @@ const (
2625
OP_CREATE_STATEMENT = "create_statement"
2726
)
2827

28+
// emitInterval is the minimum amount of time that must elapse between
29+
// successive OP_CREATE_STATEMENT emissions for the same table, regardless of
30+
// the configured collect_interval.
31+
const emitInterval = 30 * time.Minute
32+
2933
const (
3034
// selectAllDatabases makes use of the initial DB connection to discover other databases on the same Postgres instance
3135
selectAllDatabases = `
@@ -174,7 +178,6 @@ type tableInfo struct {
174178
database database
175179
schema schema
176180
tableName table
177-
updateTime time.Time
178181
b64TableSpec string
179182
}
180183

@@ -312,10 +315,6 @@ type SchemaDetailsArguments struct {
312315
ExcludeDatabases []string
313316
EntryHandler loki.EntryHandler
314317

315-
CacheEnabled bool
316-
CacheSize int
317-
CacheTTL time.Duration
318-
319318
Logger *slog.Logger
320319

321320
dbConnectionFactory databaseConnectionFactory
@@ -329,10 +328,14 @@ type SchemaDetails struct {
329328
excludeDatabases []string
330329
entryHandler loki.EntryHandler
331330

332-
// Cache of table definitions. Entries are removed after a configurable TTL.
333-
// Key is a string of the form "database.schema.table".
334-
// (unlike MySQL) no create/update timestamp available for detecting immediately when a table schema is changed; relying on TTL only
335-
cache *expirable.LRU[string, *tableInfo]
331+
// lastEmittedAt records the wall-clock time at which OP_CREATE_STATEMENT
332+
// was last emitted for a table. The outer key is the database name and
333+
// the inner key is "schema.table". Used to throttle logging to at most
334+
// one per emitInterval per table.
335+
lastEmittedAt map[database]map[string]time.Time
336+
337+
// now allows overriding time.Now() in tests
338+
now func() time.Time
336339

337340
tableRegistry *TableRegistry
338341

@@ -356,15 +359,13 @@ func NewSchemaDetails(args SchemaDetailsArguments) (*SchemaDetails, error) {
356359
collectInterval: args.CollectInterval,
357360
excludeDatabases: args.ExcludeDatabases,
358361
entryHandler: args.EntryHandler,
362+
lastEmittedAt: map[database]map[string]time.Time{},
363+
now: time.Now,
359364
tableRegistry: NewTableRegistry(),
360365
logger: args.Logger.With("collector", SchemaDetailsCollector),
361366
running: &atomic.Bool{},
362367
}
363368

364-
if args.CacheEnabled {
365-
c.cache = expirable.NewLRU[string, *tableInfo](args.CacheSize, nil, args.CacheTTL)
366-
}
367-
368369
return c, nil
369370
}
370371

@@ -446,6 +447,9 @@ func (c *SchemaDetails) getAllDatabases(ctx context.Context) ([]string, error) {
446447
}
447448

448449
func (c *SchemaDetails) extractSchemas(ctx context.Context, dbName string, dbConnection *sql.DB) error {
450+
now := c.now()
451+
db := database(dbName)
452+
449453
schemaRs, err := dbConnection.QueryContext(ctx, selectSchemaNames)
450454
if err != nil {
451455
return fmt.Errorf("failed to query pg_namespace for database %s: %w", dbName, err)
@@ -468,71 +472,82 @@ func (c *SchemaDetails) extractSchemas(ctx context.Context, dbName string, dbCon
468472

469473
if len(schemas) == 0 {
470474
c.logger.Info("no schema detected from pg_namespace", "datname", dbName)
475+
c.pruneDatabaseThrottle(db, nil)
471476
return nil
472477
}
473478

474479
tables := []*tableInfo{}
480+
// scanComplete tracks whether we managed to iterate every schema's
481+
// tables without bailing out early. If false, we have a partial view of
482+
// the database and must skip cleanup to avoid evicting throttle entries
483+
// for tables we simply didn't get to scan this round.
484+
scanComplete := true
475485

476486
for _, schemaName := range schemas {
477-
rs, err := dbConnection.QueryContext(ctx, selectTableNames, schemaName)
478-
if err != nil {
479-
c.logger.Error("failed to query tables", "datname", dbName, "schema", schemaName, "err", err)
480-
break
481-
}
482-
defer rs.Close()
487+
complete := func() bool {
488+
rs, err := dbConnection.QueryContext(ctx, selectTableNames, schemaName)
489+
if err != nil {
490+
c.logger.Error("failed to query tables", "datname", dbName, "schema", schemaName, "err", err)
491+
return false
492+
}
493+
defer rs.Close()
483494

484-
for rs.Next() {
485-
var tableName string
486-
if err := rs.Scan(&tableName); err != nil {
487-
c.logger.Error("failed to scan tables", "datname", dbName, "schema", schemaName, "err", err)
488-
break
495+
for rs.Next() {
496+
var tableName string
497+
if err := rs.Scan(&tableName); err != nil {
498+
c.logger.Error("failed to scan tables", "datname", dbName, "schema", schemaName, "err", err)
499+
return false
500+
}
501+
tables = append(tables, &tableInfo{
502+
database: db,
503+
schema: schema(schemaName),
504+
tableName: table(tableName),
505+
})
506+
507+
c.entryHandler.Chan() <- database_observability.BuildLokiEntry(
508+
logging.LevelInfo,
509+
OP_TABLE_DETECTION,
510+
fmt.Sprintf(`datname="%s" schema="%s" table="%s"`, dbName, schemaName, tableName),
511+
)
489512
}
490-
tables = append(tables, &tableInfo{
491-
database: database(dbName),
492-
schema: schema(schemaName),
493-
tableName: table(tableName),
494-
updateTime: time.Now(),
495-
})
496-
497-
c.entryHandler.Chan() <- database_observability.BuildLokiEntry(
498-
logging.LevelInfo,
499-
OP_TABLE_DETECTION,
500-
fmt.Sprintf(`datname="%s" schema="%s" table="%s"`, dbName, schemaName, tableName),
501-
)
502-
}
503513

504-
if err := rs.Err(); err != nil {
505-
return fmt.Errorf("failed to iterate over tables result set for database %q schema %q: %w", dbName, schemaName, err)
514+
if err := rs.Err(); err != nil {
515+
c.logger.Error("failed to iterate over tables result set", "datname", dbName, "schema", schemaName, "err", err)
516+
return false
517+
}
518+
return true
519+
}()
520+
521+
if !complete {
522+
scanComplete = false
523+
break
506524
}
507525
}
508526

509-
c.tableRegistry.SetTablesForDatabase(database(dbName), tables)
527+
if scanComplete {
528+
c.tableRegistry.SetTablesForDatabase(db, tables)
529+
}
510530

511531
if len(tables) == 0 {
512532
c.logger.Info("no tables detected from pg_tables", "datname", dbName)
533+
if scanComplete {
534+
c.pruneDatabaseThrottle(db, nil)
535+
}
513536
return nil
514537
}
515538

516539
for _, table := range tables {
517-
cacheKey := fmt.Sprintf("%s.%s.%s", table.database, table.schema, table.tableName)
518-
519-
cacheHit := false
520-
if c.cache != nil {
521-
if cached, ok := c.cache.Get(cacheKey); ok {
522-
table = cached
523-
cacheHit = true
540+
key := schemaTableKey(table.schema, table.tableName)
541+
if inner := c.lastEmittedAt[db]; inner != nil {
542+
if last, ok := inner[key]; ok && now.Sub(last) < emitInterval {
543+
continue
524544
}
525545
}
526546

527-
if !cacheHit {
528-
table, err = c.fetchTableDefinitions(ctx, table, dbConnection)
529-
if err != nil {
530-
c.logger.Error("failed to get table definitions", "datname", dbName, "schema", table.schema, "table", table.tableName, "err", err)
531-
continue
532-
}
533-
if c.cache != nil {
534-
c.cache.Add(cacheKey, table)
535-
}
547+
table, err = c.fetchTableDefinitions(ctx, table, dbConnection)
548+
if err != nil {
549+
c.logger.Error("failed to get table definitions", "datname", dbName, "schema", table.schema, "table", table.tableName, "err", err)
550+
continue
536551
}
537552

538553
c.entryHandler.Chan() <- database_observability.BuildLokiEntry(
@@ -543,11 +558,42 @@ func (c *SchemaDetails) extractSchemas(ctx context.Context, dbName string, dbCon
543558
dbName, table.schema, table.tableName, table.b64TableSpec,
544559
),
545560
)
561+
if c.lastEmittedAt[db] == nil {
562+
c.lastEmittedAt[db] = map[string]time.Time{}
563+
}
564+
c.lastEmittedAt[db][key] = now
565+
}
566+
567+
if scanComplete {
568+
c.pruneDatabaseThrottle(db, tables)
546569
}
547570

548571
return nil
549572
}
550573

574+
// pruneDatabaseThrottle removes entries in c.lastEmittedAt[db] whose
575+
// schema.table key is not present in the given tables. If the resulting
576+
// inner map is empty, the outer entry is also deleted. Must only be called
577+
// after a complete scan of the database's tables.
578+
func (c *SchemaDetails) pruneDatabaseThrottle(db database, tables []*tableInfo) {
579+
if _, ok := c.lastEmittedAt[db]; !ok {
580+
return
581+
}
582+
583+
currentKeys := make(map[string]struct{}, len(tables))
584+
for _, t := range tables {
585+
currentKeys[schemaTableKey(t.schema, t.tableName)] = struct{}{}
586+
}
587+
for k := range c.lastEmittedAt[db] {
588+
if _, ok := currentKeys[k]; !ok {
589+
delete(c.lastEmittedAt[db], k)
590+
}
591+
}
592+
if len(c.lastEmittedAt[db]) == 0 {
593+
delete(c.lastEmittedAt, db)
594+
}
595+
}
596+
551597
func (c *SchemaDetails) extractNames(ctx context.Context) error {
552598
databases, err := c.getAllDatabases(ctx)
553599
if err != nil {
@@ -583,6 +629,19 @@ func (c *SchemaDetails) extractNames(ctx context.Context) error {
583629
}
584630
}
585631

632+
// Drop throttle entries for databases that getAllDatabases no longer
633+
// returns (dropped, CONNECT revoked, added to exclude list). Per-table
634+
// cleanup within a still-present database is handled by extractSchemas.
635+
seenDatabases := make(map[database]struct{}, len(databases))
636+
for _, dbName := range databases {
637+
seenDatabases[database(dbName)] = struct{}{}
638+
}
639+
for db := range c.lastEmittedAt {
640+
if _, ok := seenDatabases[db]; !ok {
641+
delete(c.lastEmittedAt, db)
642+
}
643+
}
644+
586645
return nil
587646
}
588647

@@ -711,3 +770,7 @@ func (c *SchemaDetails) fetchColumnsDefinitions(ctx context.Context, databaseNam
711770

712771
return tblSpec, nil
713772
}
773+
774+
func schemaTableKey(sch schema, tbl table) string {
775+
return fmt.Sprintf("%s.%s", sch, tbl)
776+
}

0 commit comments

Comments
 (0)