Skip to content

Commit 7e94036

Browse files
authored
Merge branch 'main' into mn/mysql-health-check-exclude-schemas
2 parents 2cb7af4 + bf2b436 commit 7e94036

2 files changed

Lines changed: 348 additions & 61 deletions

File tree

internal/component/database_observability/mysql/collector/query_samples.go

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ const (
2727
cpuTimeField = `, statements.CPU_TIME`
2828
maxControlledMemoryField = `, statements.MAX_CONTROLLED_MEMORY`
2929
maxTotalMemoryField = `, statements.MAX_TOTAL_MEMORY`
30-
sqlTextField = `, statements.SQL_TEXT`
31-
sqlTextNotNullClause = ` AND statements.SQL_TEXT IS NOT NULL`
32-
digestTextNotNullClause = ` AND statements.DIGEST_TEXT IS NOT NULL`
3330
endOfTimeline = ` AND statements.TIMER_END > ? AND statements.TIMER_END <= ?`
3431
beginningAndEndOfTimeline = ` AND statements.TIMER_END > ? OR statements.TIMER_END <= ?`
3532
)
@@ -49,6 +46,7 @@ SELECT
4946
statements.EVENT_ID,
5047
statements.END_EVENT_ID,
5148
statements.DIGEST,
49+
statements.SQL_TEXT,
5250
statements.TIMER_END,
5351
statements.TIMER_WAIT,
5452
statements.ROWS_EXAMINED,
@@ -76,8 +74,9 @@ LEFT JOIN
7674
ON statements.THREAD_ID = threads.THREAD_ID
7775
WHERE
7876
statements.DIGEST IS NOT NULL
77+
AND statements.SQL_TEXT IS NOT NULL
7978
AND statements.CURRENT_SCHEMA NOT IN %s
80-
%s %s %s`
79+
%s %s`
8180

8281
const updateSetupConsumers = `
8382
UPDATE performance_schema.setup_consumers
@@ -246,15 +245,6 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
246245

247246
timerClause, limit := c.determineTimerClauseAndLimit(uptime)
248247

249-
var textField, textNotNullClause string
250-
if c.disableQueryRedaction {
251-
textField = sqlTextField
252-
textNotNullClause = sqlTextNotNullClause
253-
} else {
254-
textField = ""
255-
textNotNullClause = digestTextNotNullClause
256-
}
257-
258248
excludedSchemasClause := buildExcludedSchemasClause(c.excludeSchemas)
259249

260250
var waitDurationClause string
@@ -269,13 +259,12 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
269259

270260
query := ""
271261
if semver.MustParseRange("<8.0.28")(c.engineVersion) {
272-
query = fmt.Sprintf(selectQuerySamples, textField, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause)
262+
query = fmt.Sprintf(selectQuerySamples, "", waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause)
273263
} else if semver.MustParseRange("<8.0.31")(c.engineVersion) {
274-
additionalFields := cpuTimeField + textField
275-
query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause)
264+
query = fmt.Sprintf(selectQuerySamples, cpuTimeField, waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause)
276265
} else {
277-
additionalFields := cpuTimeField + maxControlledMemoryField + maxTotalMemoryField + textField
278-
query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause)
266+
additionalFields := cpuTimeField + maxControlledMemoryField + maxTotalMemoryField
267+
query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause)
279268
}
280269

281270
rs, err := c.dbConnection.QueryContext(ctx, query, c.timerBookmark, limit)
@@ -336,6 +325,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
336325
&row.StatementEventID,
337326
&row.StatementEndEventID,
338327
&row.Digest,
328+
&row.SQLText,
339329
&row.TimerEndPicoseconds,
340330
&row.ElapsedTimePicoseconds,
341331
&row.RowsExamined,
@@ -360,10 +350,6 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
360350
scanArgs = append(scanArgs, &row.MaxTotalMemory)
361351
}
362352

363-
if c.disableQueryRedaction {
364-
scanArgs = append(scanArgs, &row.SQLText)
365-
}
366-
367353
err := rs.Scan(scanArgs...)
368354
if err != nil {
369355
level.Error(c.logger).Log("msg", "failed to scan history table samples", "err", err)
@@ -379,6 +365,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
379365
row.TimestampMilliseconds = calculateWallTime(serverStartTime, row.TimerEndPicoseconds.Float64, uptime)
380366
cpuTime := picosecondsToMilliseconds(row.CPUTime)
381367
elapsedTime := picosecondsToMilliseconds(row.ElapsedTimePicoseconds.Float64)
368+
traceParent := tryExtractTraceParent(row.SQLText.String)
382369

383370
logMessage := fmt.Sprintf(
384371
`schema="%s" user="%s" client_host="%s" thread_id="%s" event_id="%s" end_event_id="%s" digest="%s" rows_examined="%d" rows_sent="%d" rows_affected="%d" errors="%d" max_controlled_memory="%db" max_total_memory="%db" cpu_time="%fms" elapsed_time="%fms" elapsed_time_ms="%fms"`,
@@ -395,6 +382,9 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
395382
elapsedTime,
396383
elapsedTime,
397384
)
385+
if traceParent != "" {
386+
logMessage += fmt.Sprintf(` traceparent="%s"`, traceParent)
387+
}
398388
if c.disableQueryRedaction && row.SQLText.Valid {
399389
logMessage += fmt.Sprintf(` sql_text="%s"`, row.SQLText.String)
400390
}
@@ -526,3 +516,54 @@ func classifyMySQLWaitEventType(waitEventName string) string {
526516
}
527517
return "Other Wait"
528518
}
519+
520+
// tryExtractTraceParent attempts to extract a W3C traceparent value added at the end of SQL text as a trailing
521+
// block comment, e.g. "/*traceparent='00-<traceid>-<spanid>-<flags>'*/".
522+
// It returns the traceparent string when matched, otherwise an empty string.
523+
func tryExtractTraceParent(sqlText string) string {
524+
if strings.HasSuffix(sqlText, "...") {
525+
return ""
526+
}
527+
528+
// Find the last comment: strip out /* and */
529+
start := strings.LastIndex(sqlText, "/*")
530+
if start < 0 {
531+
return ""
532+
}
533+
body := sqlText[start+2:]
534+
end := strings.Index(body, "*/")
535+
if end < 0 {
536+
return ""
537+
}
538+
539+
body = body[:end]
540+
body = strings.TrimSpace(body)
541+
if body == "" {
542+
return ""
543+
}
544+
545+
// Split the comment by comma into key value pairs
546+
pairs := strings.Split(body, ",")
547+
for _, pair := range pairs {
548+
pair = strings.TrimSpace(pair)
549+
key, val, ok := strings.Cut(pair, "=")
550+
if !ok {
551+
continue
552+
}
553+
554+
if !strings.EqualFold(strings.TrimSpace(key), "traceparent") {
555+
continue
556+
}
557+
558+
// SQL unescape: trim ' or " at beginning and end of value
559+
if strings.HasPrefix(val, "'") || strings.HasPrefix(val, `"`) {
560+
quote := string(val[0])
561+
val = strings.TrimPrefix(val, quote)
562+
val = strings.TrimSuffix(val, quote)
563+
}
564+
565+
return strings.TrimSpace(val)
566+
}
567+
568+
return ""
569+
}

0 commit comments

Comments
 (0)