Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -25,9 +26,6 @@ const (
cpuTimeField = `, statements.CPU_TIME`
maxControlledMemoryField = `, statements.MAX_CONTROLLED_MEMORY`
maxTotalMemoryField = `, statements.MAX_TOTAL_MEMORY`
sqlTextField = `, statements.SQL_TEXT`
sqlTextNotNullClause = ` AND statements.SQL_TEXT IS NOT NULL`
digestTextNotNullClause = ` AND statements.DIGEST_TEXT IS NOT NULL`
endOfTimeline = ` AND statements.TIMER_END > ? AND statements.TIMER_END <= ?`
beginningAndEndOfTimeline = ` AND statements.TIMER_END > ? OR statements.TIMER_END <= ?`
)
Expand All @@ -47,6 +45,7 @@ SELECT
statements.EVENT_ID,
statements.END_EVENT_ID,
statements.DIGEST,
statements.SQL_TEXT,
statements.TIMER_END,
statements.TIMER_WAIT,
statements.ROWS_EXAMINED,
Expand Down Expand Up @@ -74,8 +73,9 @@ LEFT JOIN
ON statements.THREAD_ID = threads.THREAD_ID
WHERE
statements.DIGEST IS NOT NULL
AND statements.DIGEST_TEXT IS NOT NULL
AND statements.CURRENT_SCHEMA NOT IN %s
%s %s %s`
%s %s`

const updateSetupConsumers = `
UPDATE performance_schema.setup_consumers
Expand Down Expand Up @@ -241,15 +241,6 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {

timerClause, limit := c.determineTimerClauseAndLimit(uptime)

var textField, textNotNullClause string
if c.disableQueryRedaction {
textField = sqlTextField
textNotNullClause = sqlTextNotNullClause
} else {
textField = ""
textNotNullClause = digestTextNotNullClause
}

excludedSchemasClause := buildExcludedSchemasClause(c.excludeSchemas)

var waitDurationClause string
Expand All @@ -264,13 +255,12 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {

query := ""
if semver.MustParseRange("<8.0.28")(c.engineVersion) {
query = fmt.Sprintf(selectQuerySamples, textField, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause)
query = fmt.Sprintf(selectQuerySamples, "", waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause)
} else if semver.MustParseRange("<8.0.31")(c.engineVersion) {
additionalFields := cpuTimeField + textField
query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause)
query = fmt.Sprintf(selectQuerySamples, cpuTimeField, waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause)
} else {
additionalFields := cpuTimeField + maxControlledMemoryField + maxTotalMemoryField + textField
query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause)
additionalFields := cpuTimeField + maxControlledMemoryField + maxTotalMemoryField
query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause)
}

rs, err := c.dbConnection.QueryContext(ctx, query, c.timerBookmark, limit)
Expand Down Expand Up @@ -331,6 +321,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
&row.StatementEventID,
&row.StatementEndEventID,
&row.Digest,
&row.SQLText,
&row.TimerEndPicoseconds,
&row.ElapsedTimePicoseconds,
&row.RowsExamined,
Expand All @@ -355,10 +346,6 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
scanArgs = append(scanArgs, &row.MaxTotalMemory)
}

if c.disableQueryRedaction {
scanArgs = append(scanArgs, &row.SQLText)
}

err := rs.Scan(scanArgs...)
if err != nil {
level.Error(c.logger).Log("msg", "failed to scan history table samples", "err", err)
Expand All @@ -374,9 +361,10 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
row.TimestampMilliseconds = calculateWallTime(serverStartTime, row.TimerEndPicoseconds.Float64, uptime)
cpuTime := picosecondsToMilliseconds(row.CPUTime)
elapsedTime := picosecondsToMilliseconds(row.ElapsedTimePicoseconds.Float64)
traceParent := tryExtractTraceParent(row.SQLText.String)

logMessage := fmt.Sprintf(
`schema="%s" user="%s" client_host="%s" thread_id="%s" event_id="%s" end_event_id="%s" digest="%s" rows_examined="%d" rows_sent="%d" rows_affected="%d" errors="%d" max_controlled_memory="%db" max_total_memory="%db" cpu_time="%fms" elapsed_time="%fms" elapsed_time_ms="%fms"`,
`schema="%s" user="%s" client_host="%s" thread_id="%s" event_id="%s" end_event_id="%s" digest="%s" rows_examined="%d" rows_sent="%d" rows_affected="%d" errors="%d" max_controlled_memory="%db" max_total_memory="%db" cpu_time="%fms" elapsed_time="%fms" elapsed_time_ms="%fms" traceparent="%s"`,
row.Schema.String, row.User.String, row.Host.String, row.ThreadID.String,
row.StatementEventID.String, row.StatementEndEventID.String,
row.Digest.String,
Expand All @@ -389,6 +377,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
cpuTime,
elapsedTime,
elapsedTime,
traceParent,
)
if c.disableQueryRedaction && row.SQLText.Valid {
logMessage += fmt.Sprintf(` sql_text="%s"`, row.SQLText.String)
Expand All @@ -409,7 +398,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
if row.WaitEventID.Valid && row.WaitTime.Valid {
waitTime := picosecondsToMilliseconds(row.WaitTime.Float64)
waitLogMessage := fmt.Sprintf(
`schema="%s" user="%s" client_host="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms"`,
`schema="%s" user="%s" client_host="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms" traceparent="%s"`,
row.Schema.String,
row.User.String,
row.Host.String,
Expand All @@ -422,6 +411,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error {
row.WaitObjectName.String,
row.WaitObjectType.String,
waitTime,
traceParent,
)

c.entryHandler.Chan() <- database_observability.BuildLokiEntryWithTimestamp(
Expand Down Expand Up @@ -472,3 +462,34 @@ func (c *QuerySamples) determineTimerClauseAndLimit(uptime float64) (string, flo

return timerClause, limit
}

// tryExtractTraceParent attempts to extract a W3C traceparent value added at the end of SQL text as a trailing
// block comment, e.g. "/*traceparent='00-<traceid>-<spanid>-<flags>'*/".
// It returns the traceparent string when matched, otherwise an empty string.
func tryExtractTraceParent(sqlText string) string {
if strings.HasSuffix(sqlText, "...") {
return ""
}

idx := strings.LastIndex(strings.ToLower(sqlText), "traceparent=")
if idx < 0 {
return ""
}
tp := sqlText[idx+len("traceparent="):]

quote := tp[0]
if quote == '\'' || quote == '"' {
tp = tp[1:]
end := strings.IndexByte(tp, quote)
if end < 0 {
end = len(tp)
}
return strings.TrimSpace(tp[:end])
}

if end := strings.Index(tp, "*/"); end > 0 {
tp = tp[:end]
}

return tp
}
Loading
Loading