Skip to content

Commit 1ca12fe

Browse files
committed
feat(database_observability): add wait_event_v2 op with pre-classified wait_event_type
Introduces wait_event_v2 as an alternative to the baseline wait_event op. When enable_pre_classified_wait_events is set in the query_samples block, Alloy emits wait_event_v2 instead of wait_event, with wait_event_type pre-classified at emit time (IO Wait / Lock Wait / Network Wait / Other Wait) rather than computed via label_replace at query time. MySQL classification: wait/io/* -> IO Wait, wait/lock/* -> Lock Wait, wait/synch/* -> Synch Wait. Postgres classification: IO -> IO Wait, Lock/LWLock/... -> Lock Wait, Client -> Network Wait, default -> Other Wait.
1 parent 19df108 commit 1ca12fe

8 files changed

Lines changed: 511 additions & 105 deletions

File tree

docs/sources/reference/components/database_observability/database_observability.mysql.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ The `gcp` block supplies the identifying information for the GCP Cloud SQL datab
168168
| `setup_consumers_check_interval` | `duration` | How frequently to check if `setup_consumers` are correctly enabled. | `"1h"` | no |
169169
| `sample_min_duration` | `duration` | Minimum duration for query samples to be collected. Set to "0s" to disable filtering and collect all samples regardless of their duration.| `"0s"` | no |
170170
| `wait_event_min_duration` | `duration` | Minimum duration for a wait event to be collected. Set to "0s" to disable filtering and collect all wait events regardless of their duration. | `"1us"` | no |
171+
| `enable_pre_classified_wait_events` | `boolean` | When `true`, emits `wait_event_v2` entries with `wait_event_type` pre-classified as `IO Wait`, `Lock Wait`, `Network Wait`, or `Other Wait` in the log body, instead of the baseline `wait_event` entries. | `false` | no |
171172

172173
### `setup_actors`
173174

docs/sources/reference/components/database_observability/database_observability.postgres.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ The `gcp` block supplies the identifying information for the GCP Cloud SQL datab
140140
| `collect_interval` | `duration` | How frequently to collect information from database. | `"15s"` | no |
141141
| `disable_query_redaction` | `bool` | Collect unredacted SQL query text (might include parameters). | `false` | no |
142142
| `exclude_current_user` | `bool` | Do not collect query samples for current database user. | `true` | no |
143+
| `enable_pre_classified_wait_events` | `boolean` | When `true`, emits `wait_event_v2` entries with `wait_event_type` pre-classified as `IO Wait`, `Lock Wait`, `Network Wait`, or `Other Wait` in the log body, instead of the baseline `wait_event` entries. | `false` | no |
143144

144145
### `schema_details`
145146

internal/component/database_observability/mysql/collector/query_samples.go

Lines changed: 87 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"strings"
78
"sync"
89
"time"
910

@@ -21,6 +22,7 @@ const (
2122
QuerySamplesCollector = "query_samples"
2223
OP_QUERY_SAMPLE = "query_sample"
2324
OP_WAIT_EVENT = "wait_event"
25+
OP_WAIT_EVENT_V2 = "wait_event_v2"
2426

2527
cpuTimeField = `, statements.CPU_TIME`
2628
maxControlledMemoryField = `, statements.MAX_CONTROLLED_MEMORY`
@@ -83,31 +85,33 @@ const updateSetupConsumers = `
8385
WHERE name in ('events_statements_cpu', 'events_waits_current', 'events_waits_history')`
8486

8587
type QuerySamplesArguments struct {
86-
DB *sql.DB
87-
EngineVersion semver.Version
88-
CollectInterval time.Duration
89-
ExcludeSchemas []string
90-
EntryHandler loki.EntryHandler
91-
DisableQueryRedaction bool
92-
AutoEnableSetupConsumers bool
93-
SetupConsumersCheckInterval time.Duration
94-
SampleMinDuration time.Duration
95-
WaitEventMinDuration time.Duration
88+
DB *sql.DB
89+
EngineVersion semver.Version
90+
CollectInterval time.Duration
91+
ExcludeSchemas []string
92+
EntryHandler loki.EntryHandler
93+
DisableQueryRedaction bool
94+
AutoEnableSetupConsumers bool
95+
SetupConsumersCheckInterval time.Duration
96+
SampleMinDuration time.Duration
97+
WaitEventMinDuration time.Duration
98+
EnablePreClassifiedWaitEvents bool
9699

97100
Logger log.Logger
98101
}
99102

100103
type QuerySamples struct {
101-
dbConnection *sql.DB
102-
engineVersion semver.Version
103-
collectInterval time.Duration
104-
excludeSchemas []string
105-
entryHandler loki.EntryHandler
106-
disableQueryRedaction bool
107-
autoEnableSetupConsumers bool
108-
setupConsumersCheckInterval time.Duration
109-
sampleMinDuration time.Duration
110-
waitEventMinDuration time.Duration
104+
dbConnection *sql.DB
105+
engineVersion semver.Version
106+
collectInterval time.Duration
107+
excludeSchemas []string
108+
entryHandler loki.EntryHandler
109+
disableQueryRedaction bool
110+
autoEnableSetupConsumers bool
111+
setupConsumersCheckInterval time.Duration
112+
sampleMinDuration time.Duration
113+
waitEventMinDuration time.Duration
114+
enablePreClassifiedWaitEvents bool
111115

112116
logger log.Logger
113117
running *atomic.Bool
@@ -121,18 +125,19 @@ type QuerySamples struct {
121125

122126
func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) {
123127
c := &QuerySamples{
124-
dbConnection: args.DB,
125-
engineVersion: args.EngineVersion,
126-
collectInterval: args.CollectInterval,
127-
excludeSchemas: args.ExcludeSchemas,
128-
entryHandler: args.EntryHandler,
129-
disableQueryRedaction: args.DisableQueryRedaction,
130-
autoEnableSetupConsumers: args.AutoEnableSetupConsumers,
131-
setupConsumersCheckInterval: args.SetupConsumersCheckInterval,
132-
sampleMinDuration: args.SampleMinDuration,
133-
waitEventMinDuration: args.WaitEventMinDuration,
134-
logger: log.With(args.Logger, "collector", QuerySamplesCollector),
135-
running: &atomic.Bool{},
128+
dbConnection: args.DB,
129+
engineVersion: args.EngineVersion,
130+
collectInterval: args.CollectInterval,
131+
excludeSchemas: args.ExcludeSchemas,
132+
entryHandler: args.EntryHandler,
133+
disableQueryRedaction: args.DisableQueryRedaction,
134+
autoEnableSetupConsumers: args.AutoEnableSetupConsumers,
135+
setupConsumersCheckInterval: args.SetupConsumersCheckInterval,
136+
sampleMinDuration: args.SampleMinDuration,
137+
waitEventMinDuration: args.WaitEventMinDuration,
138+
enablePreClassifiedWaitEvents: args.EnablePreClassifiedWaitEvents,
139+
logger: log.With(args.Logger, "collector", QuerySamplesCollector),
140+
running: &atomic.Bool{},
136141
}
137142

138143
return c, nil
@@ -408,28 +413,33 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
408413

409414
if row.WaitEventID.Valid && row.WaitTime.Valid {
410415
waitTime := picosecondsToMilliseconds(row.WaitTime.Float64)
411-
waitLogMessage := fmt.Sprintf(
412-
`schema="%s" user="%s" client_host="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms"`,
413-
row.Schema.String,
414-
row.User.String,
415-
row.Host.String,
416-
row.ThreadID.String,
417-
row.Digest.String,
418-
row.StatementEventID.String,
419-
row.WaitEventID.String,
420-
row.WaitEndEventID.String,
421-
row.WaitEventName.String,
422-
row.WaitObjectName.String,
423-
row.WaitObjectType.String,
424-
waitTime,
425-
)
426416

427-
c.entryHandler.Chan() <- database_observability.BuildLokiEntryWithTimestamp(
428-
logging.LevelInfo,
429-
OP_WAIT_EVENT,
430-
waitLogMessage,
431-
int64(millisecondsToNanoseconds(row.TimestampMilliseconds)),
432-
)
417+
if c.enablePreClassifiedWaitEvents {
418+
waitV2LogMessage := fmt.Sprintf(
419+
`schema="%s" user="%s" client_host="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_event_type="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms"`,
420+
row.Schema.String, row.User.String, row.Host.String,
421+
row.ThreadID.String, row.Digest.String,
422+
row.StatementEventID.String, row.WaitEventID.String, row.WaitEndEventID.String,
423+
row.WaitEventName.String, classifyMySQLWaitEventType(row.WaitEventName.String),
424+
row.WaitObjectName.String, row.WaitObjectType.String, waitTime,
425+
)
426+
c.entryHandler.Chan() <- database_observability.BuildLokiEntryWithTimestamp(
427+
logging.LevelInfo, OP_WAIT_EVENT_V2, waitV2LogMessage,
428+
int64(millisecondsToNanoseconds(row.TimestampMilliseconds)),
429+
)
430+
} else {
431+
waitLogMessage := fmt.Sprintf(
432+
`schema="%s" user="%s" client_host="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms"`,
433+
row.Schema.String, row.User.String, row.Host.String,
434+
row.ThreadID.String, row.Digest.String,
435+
row.StatementEventID.String, row.WaitEventID.String, row.WaitEndEventID.String,
436+
row.WaitEventName.String, row.WaitObjectName.String, row.WaitObjectType.String, waitTime,
437+
)
438+
c.entryHandler.Chan() <- database_observability.BuildLokiEntryWithTimestamp(
439+
logging.LevelInfo, OP_WAIT_EVENT, waitLogMessage,
440+
int64(millisecondsToNanoseconds(row.TimestampMilliseconds)),
441+
)
442+
}
433443
}
434444
}
435445

@@ -472,3 +482,27 @@ func (c *QuerySamples) determineTimerClauseAndLimit(uptime float64) (string, flo
472482

473483
return timerClause, limit
474484
}
485+
486+
// classifyMySQLWaitEventType maps a raw MySQL performance_schema wait event name
487+
// to a standardized category, aligned with the wait taxonomy used elsewhere:
488+
//
489+
// IO Wait = wait/io/(file|table).+
490+
// Network Wait = wait/io/socket.+
491+
// Lock Wait = wait/(io/lock|synch|lock).+
492+
func classifyMySQLWaitEventType(waitEventName string) string {
493+
rest, ok := strings.CutPrefix(waitEventName, "wait/")
494+
if !ok {
495+
return "Other Wait"
496+
}
497+
switch {
498+
case strings.HasPrefix(rest, "io/file/"), strings.HasPrefix(rest, "io/table/"):
499+
return "IO Wait"
500+
case strings.HasPrefix(rest, "io/socket/"):
501+
return "Network Wait"
502+
case strings.HasPrefix(rest, "io/lock/"),
503+
strings.HasPrefix(rest, "synch/"),
504+
strings.HasPrefix(rest, "lock/"):
505+
return "Lock Wait"
506+
}
507+
return "Other Wait"
508+
}

internal/component/database_observability/mysql/collector/query_samples_test.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2849,3 +2849,161 @@ func TestQuerySamplesExcludeSchemas(t *testing.T) {
28492849
c.fetchQuerySamples(t.Context())
28502850
require.NoError(t, mock.ExpectationsWereMet())
28512851
}
2852+
2853+
func TestQuerySamples_WaitEvents_PreClassified(t *testing.T) {
2854+
t.Run("flag OFF emits only OP_WAIT_EVENT without wait_event_type", func(t *testing.T) {
2855+
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
2856+
require.NoError(t, err)
2857+
defer db.Close()
2858+
2859+
lokiClient := loki.NewCollectingHandler()
2860+
2861+
collector, err := NewQuerySamples(QuerySamplesArguments{
2862+
DB: db,
2863+
EngineVersion: latestCompatibleVersion,
2864+
CollectInterval: time.Second,
2865+
EntryHandler: lokiClient,
2866+
Logger: log.NewLogfmtLogger(os.Stderr),
2867+
EnablePreClassifiedWaitEvents: false,
2868+
})
2869+
require.NoError(t, err)
2870+
require.NotNil(t, collector)
2871+
2872+
mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1"))
2873+
mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1))
2874+
mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs(
2875+
1e12, 1e12,
2876+
).RowsWillBeClosed().WillReturnRows(
2877+
sqlmock.NewRows([]string{
2878+
"statements.CURRENT_SCHEMA", "statements.THREAD_ID", "statements.EVENT_ID",
2879+
"statements.END_EVENT_ID", "statements.DIGEST", "statements.TIMER_END",
2880+
"statements.TIMER_WAIT", "statements.ROWS_EXAMINED", "statements.ROWS_SENT",
2881+
"statements.ROWS_AFFECTED", "statements.ERRORS",
2882+
"waits.event_id", "waits.end_event_id", "waits.event_name",
2883+
"waits.object_name", "waits.object_type", "waits.timer_wait",
2884+
"threads.PROCESSLIST_USER", "threads.PROCESSLIST_HOST",
2885+
"statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY",
2886+
}).AddRow(
2887+
"some_schema", "890", "123", "234", "some_digest",
2888+
"70000000", "20000000", "5", "5", "0", "0",
2889+
"124", "124", "wait/io/file/innodb/innodb_data_file",
2890+
"wait_object_name", "wait_object_type", "100000000",
2891+
"some_user", "some_host",
2892+
"10000000", "456", "457",
2893+
),
2894+
)
2895+
2896+
err = collector.Start(t.Context())
2897+
require.NoError(t, err)
2898+
2899+
require.Eventually(t, func() bool {
2900+
return len(lokiClient.Received()) == 2
2901+
}, 5*time.Second, 100*time.Millisecond)
2902+
2903+
collector.Stop()
2904+
lokiClient.Stop()
2905+
2906+
require.Eventually(t, func() bool {
2907+
return collector.Stopped()
2908+
}, 5*time.Second, 100*time.Millisecond)
2909+
2910+
require.NoError(t, mock.ExpectationsWereMet())
2911+
2912+
lokiEntries := lokiClient.Received()
2913+
require.Len(t, lokiEntries, 2)
2914+
assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels)
2915+
assert.NotContains(t, lokiEntries[1].Line, "wait_event_type=")
2916+
})
2917+
2918+
t.Run("flag ON emits only OP_WAIT_EVENT_V2 with wait_event_type classified", func(t *testing.T) {
2919+
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
2920+
require.NoError(t, err)
2921+
defer db.Close()
2922+
2923+
lokiClient := loki.NewCollectingHandler()
2924+
2925+
collector, err := NewQuerySamples(QuerySamplesArguments{
2926+
DB: db,
2927+
EngineVersion: latestCompatibleVersion,
2928+
CollectInterval: time.Second,
2929+
EntryHandler: lokiClient,
2930+
Logger: log.NewLogfmtLogger(os.Stderr),
2931+
EnablePreClassifiedWaitEvents: true,
2932+
})
2933+
require.NoError(t, err)
2934+
require.NotNil(t, collector)
2935+
2936+
mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1"))
2937+
mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1))
2938+
mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs(
2939+
1e12, 1e12,
2940+
).RowsWillBeClosed().WillReturnRows(
2941+
sqlmock.NewRows([]string{
2942+
"statements.CURRENT_SCHEMA", "statements.THREAD_ID", "statements.EVENT_ID",
2943+
"statements.END_EVENT_ID", "statements.DIGEST", "statements.TIMER_END",
2944+
"statements.TIMER_WAIT", "statements.ROWS_EXAMINED", "statements.ROWS_SENT",
2945+
"statements.ROWS_AFFECTED", "statements.ERRORS",
2946+
"waits.event_id", "waits.end_event_id", "waits.event_name",
2947+
"waits.object_name", "waits.object_type", "waits.timer_wait",
2948+
"threads.PROCESSLIST_USER", "threads.PROCESSLIST_HOST",
2949+
"statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY",
2950+
}).AddRow(
2951+
"some_schema", "890", "123", "234", "some_digest",
2952+
"70000000", "20000000", "5", "5", "0", "0",
2953+
"124", "124", "wait/io/file/innodb/innodb_data_file",
2954+
"wait_object_name", "wait_object_type", "100000000",
2955+
"some_user", "some_host",
2956+
"10000000", "456", "457",
2957+
),
2958+
)
2959+
2960+
err = collector.Start(t.Context())
2961+
require.NoError(t, err)
2962+
2963+
require.Eventually(t, func() bool {
2964+
return len(lokiClient.Received()) == 2
2965+
}, 5*time.Second, 100*time.Millisecond)
2966+
2967+
collector.Stop()
2968+
lokiClient.Stop()
2969+
2970+
require.Eventually(t, func() bool {
2971+
return collector.Stopped()
2972+
}, 5*time.Second, 100*time.Millisecond)
2973+
2974+
require.NoError(t, mock.ExpectationsWereMet())
2975+
2976+
lokiEntries := lokiClient.Received()
2977+
require.Len(t, lokiEntries, 2)
2978+
assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT_V2}, lokiEntries[1].Labels)
2979+
assert.Contains(t, lokiEntries[1].Line, `wait_event_type="IO Wait"`)
2980+
// structured metadata labels should not contain wait_event_type
2981+
assert.NotContains(t, string(lokiEntries[1].Labels["wait_event_type"]), "IO Wait")
2982+
})
2983+
}
2984+
2985+
func TestClassifyMySQLWaitEventType(t *testing.T) {
2986+
tests := []struct {
2987+
input string
2988+
expected string
2989+
}{
2990+
{"wait/io/file/innodb/innodb_data_file", "IO Wait"},
2991+
{"wait/io/table/sql/handler", "IO Wait"},
2992+
{"wait/io/socket/sql/client_connection", "Network Wait"},
2993+
{"wait/io/lock/table/handler", "Lock Wait"},
2994+
{"wait/lock/table/sql/handler", "Lock Wait"},
2995+
{"wait/lock/metadata/sql/mdl", "Lock Wait"},
2996+
{"wait/synch/mutex/sql/LOCK_open", "Lock Wait"},
2997+
{"wait/synch/rwlock/sql/LOCK_system_variables", "Lock Wait"},
2998+
{"wait/unknown/something", "Other Wait"},
2999+
{"not_a_wait_event", "Other Wait"},
3000+
{"", "Other Wait"},
3001+
}
3002+
3003+
for _, tc := range tests {
3004+
t.Run(tc.input, func(t *testing.T) {
3005+
result := classifyMySQLWaitEventType(tc.input)
3006+
assert.Equal(t, tc.expected, result)
3007+
})
3008+
}
3009+
}

0 commit comments

Comments
 (0)