diff --git a/internal/component/database_observability/mysql/collector/query_samples.go b/internal/component/database_observability/mysql/collector/query_samples.go index d52cd730d48..a6cb4cb885c 100644 --- a/internal/component/database_observability/mysql/collector/query_samples.go +++ b/internal/component/database_observability/mysql/collector/query_samples.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "strings" "sync" "time" @@ -25,9 +26,6 @@ const ( cpuTimeField = `, statements.CPU_TIME` maxControlledMemoryField = `, statements.MAX_CONTROLLED_MEMORY` maxTotalMemoryField = `, statements.MAX_TOTAL_MEMORY` - sqlTextField = `, statements.SQL_TEXT` - sqlTextNotNullClause = ` AND statements.SQL_TEXT IS NOT NULL` - digestTextNotNullClause = ` AND statements.DIGEST_TEXT IS NOT NULL` endOfTimeline = ` AND statements.TIMER_END > ? AND statements.TIMER_END <= ?` beginningAndEndOfTimeline = ` AND statements.TIMER_END > ? OR statements.TIMER_END <= ?` ) @@ -47,6 +45,7 @@ SELECT statements.EVENT_ID, statements.END_EVENT_ID, statements.DIGEST, + statements.SQL_TEXT, statements.TIMER_END, statements.TIMER_WAIT, statements.ROWS_EXAMINED, @@ -74,8 +73,9 @@ LEFT JOIN ON statements.THREAD_ID = threads.THREAD_ID WHERE statements.DIGEST IS NOT NULL + AND statements.DIGEST_TEXT IS NOT NULL AND statements.CURRENT_SCHEMA NOT IN %s - %s %s %s` + %s %s` const updateSetupConsumers = ` UPDATE performance_schema.setup_consumers @@ -241,15 +241,6 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { timerClause, limit := c.determineTimerClauseAndLimit(uptime) - var textField, textNotNullClause string - if c.disableQueryRedaction { - textField = sqlTextField - textNotNullClause = sqlTextNotNullClause - } else { - textField = "" - textNotNullClause = digestTextNotNullClause - } - excludedSchemasClause := buildExcludedSchemasClause(c.excludeSchemas) var waitDurationClause string @@ -264,13 +255,12 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { query := "" if semver.MustParseRange("<8.0.28")(c.engineVersion) { - query = fmt.Sprintf(selectQuerySamples, textField, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause) + query = fmt.Sprintf(selectQuerySamples, "", waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause) } else if semver.MustParseRange("<8.0.31")(c.engineVersion) { - additionalFields := cpuTimeField + textField - query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause) + query = fmt.Sprintf(selectQuerySamples, cpuTimeField, waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause) } else { - additionalFields := cpuTimeField + maxControlledMemoryField + maxTotalMemoryField + textField - query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, textNotNullClause, sampleDurationClause, timerClause) + additionalFields := cpuTimeField + maxControlledMemoryField + maxTotalMemoryField + query = fmt.Sprintf(selectQuerySamples, additionalFields, waitDurationClause, excludedSchemasClause, sampleDurationClause, timerClause) } rs, err := c.dbConnection.QueryContext(ctx, query, c.timerBookmark, limit) @@ -331,6 +321,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { &row.StatementEventID, &row.StatementEndEventID, &row.Digest, + &row.SQLText, &row.TimerEndPicoseconds, &row.ElapsedTimePicoseconds, &row.RowsExamined, @@ -355,10 +346,6 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { scanArgs = append(scanArgs, &row.MaxTotalMemory) } - if c.disableQueryRedaction { - scanArgs = append(scanArgs, &row.SQLText) - } - err := rs.Scan(scanArgs...) if err != nil { level.Error(c.logger).Log("msg", "failed to scan history table samples", "err", err) @@ -374,9 +361,10 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { row.TimestampMilliseconds = calculateWallTime(serverStartTime, row.TimerEndPicoseconds.Float64, uptime) cpuTime := picosecondsToMilliseconds(row.CPUTime) elapsedTime := picosecondsToMilliseconds(row.ElapsedTimePicoseconds.Float64) + traceParent := tryExtractTraceParent(row.SQLText.String) logMessage := fmt.Sprintf( - `schema="%s" user="%s" client_host="%s" thread_id="%s" event_id="%s" end_event_id="%s" digest="%s" rows_examined="%d" rows_sent="%d" rows_affected="%d" errors="%d" max_controlled_memory="%db" max_total_memory="%db" cpu_time="%fms" elapsed_time="%fms" elapsed_time_ms="%fms"`, + `schema="%s" user="%s" client_host="%s" thread_id="%s" event_id="%s" end_event_id="%s" digest="%s" rows_examined="%d" rows_sent="%d" rows_affected="%d" errors="%d" max_controlled_memory="%db" max_total_memory="%db" cpu_time="%fms" elapsed_time="%fms" elapsed_time_ms="%fms" traceparent="%s"`, row.Schema.String, row.User.String, row.Host.String, row.ThreadID.String, row.StatementEventID.String, row.StatementEndEventID.String, row.Digest.String, @@ -389,6 +377,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { cpuTime, elapsedTime, elapsedTime, + traceParent, ) if c.disableQueryRedaction && row.SQLText.Valid { logMessage += fmt.Sprintf(` sql_text="%s"`, row.SQLText.String) @@ -409,7 +398,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { if row.WaitEventID.Valid && row.WaitTime.Valid { waitTime := picosecondsToMilliseconds(row.WaitTime.Float64) waitLogMessage := fmt.Sprintf( - `schema="%s" user="%s" client_host="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms"`, + `schema="%s" user="%s" client_host="%s" thread_id="%s" digest="%s" event_id="%s" wait_event_id="%s" wait_end_event_id="%s" wait_event_name="%s" wait_object_name="%s" wait_object_type="%s" wait_time="%fms" traceparent="%s"`, row.Schema.String, row.User.String, row.Host.String, @@ -422,6 +411,7 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { row.WaitObjectName.String, row.WaitObjectType.String, waitTime, + traceParent, ) c.entryHandler.Chan() <- database_observability.BuildLokiEntryWithTimestamp( @@ -472,3 +462,34 @@ func (c *QuerySamples) determineTimerClauseAndLimit(uptime float64) (string, flo return timerClause, limit } + +// tryExtractTraceParent attempts to extract a W3C traceparent value added at the end of SQL text as a trailing +// block comment, e.g. "/*traceparent='00---'*/". +// It returns the traceparent string when matched, otherwise an empty string. +func tryExtractTraceParent(sqlText string) string { + if strings.HasSuffix(sqlText, "...") { + return "" + } + + idx := strings.LastIndex(strings.ToLower(sqlText), "traceparent=") + if idx < 0 { + return "" + } + tp := sqlText[idx+len("traceparent="):] + + quote := tp[0] + if quote == '\'' || quote == '"' { + tp = tp[1:] + end := strings.IndexByte(tp, quote) + if end < 0 { + end = len(tp) + } + return strings.TrimSpace(tp[:end]) + } + + if end := strings.Index(tp, "*/"); end > 0 { + tp = tp[:end] + } + + return tp +} diff --git a/internal/component/database_observability/mysql/collector/query_samples_test.go b/internal/component/database_observability/mysql/collector/query_samples_test.go index e5f0940b389..a070feec30c 100644 --- a/internal/component/database_observability/mysql/collector/query_samples_test.go +++ b/internal/component/database_observability/mysql/collector/query_samples_test.go @@ -41,6 +41,7 @@ func TestQuerySamples(t *testing.T) { "123", "234", "some_digest", + "select * from some_table where id = ?", "70000000", "20000000", "5", @@ -63,7 +64,109 @@ func TestQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, logsLines: []string{ - "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", + "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", + }, + }, + { + name: "select query with traceparent", + rows: [][]driver.Value{{ + "some_schema", + "890", + "123", + "234", + "some_digest", + "select * from some_table where id = 1 /*traceparent='00-00bd5199fe2a4c8506368b55ef212cf1-d49c5e2fb232379b-01'*/", + "70000000", + "20000000", + "5", + "5", + "0", + "0", + nil, + nil, + nil, + nil, + nil, + nil, + "some_user", + "some_host", + "10000000", + "456", + "457", + }}, + logsLabels: []model.LabelSet{ + {"op": OP_QUERY_SAMPLE}, + }, + logsLines: []string{ + "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"00-00bd5199fe2a4c8506368b55ef212cf1-d49c5e2fb232379b-01\"", + }, + }, + { + name: "SQL_TEXT is NULL", + rows: [][]driver.Value{{ + "some_schema", + "890", + "123", + "234", + "some_digest", + nil, + "70000000", + "20000000", + "5", + "5", + "0", + "0", + nil, + nil, + nil, + nil, + nil, + nil, + "some_user", + "some_host", + "10000000", + "456", + "457", + }}, + logsLabels: []model.LabelSet{ + {"op": OP_QUERY_SAMPLE}, + }, + logsLines: []string{ + "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", + }, + }, + { + name: "select query with truncated traceparent", + rows: [][]driver.Value{{ + "some_schema", + "890", + "123", + "234", + "some_digest", + "select * from some_table where id = 1 /*traceparent='00-abc...", + "70000000", + "20000000", + "5", + "5", + "0", + "0", + nil, + nil, + nil, + nil, + nil, + nil, + "some_user", + "some_host", + "10000000", + "456", + "457", + }}, + logsLabels: []model.LabelSet{ + {"op": OP_QUERY_SAMPLE}, + }, + logsLines: []string{ + "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", }, }, { @@ -74,6 +177,7 @@ func TestQuerySamples(t *testing.T) { "123", "234", "some_digest", + "select * from some_table where id = ?", "70000000", "20000000", "5", @@ -97,6 +201,7 @@ func TestQuerySamples(t *testing.T) { "124", "235", "some_digest", + "select * from some_table where id = ?", "70000000", "20000000", "5", @@ -120,8 +225,8 @@ func TestQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, logsLines: []string{ - "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", - "level=\"info\" schema=\"some_other_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"891\" event_id=\"124\" end_event_id=\"235\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", + "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", + "level=\"info\" schema=\"some_other_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"891\" event_id=\"124\" end_event_id=\"235\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", }, }, } @@ -165,7 +270,7 @@ func TestQuerySamples(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 1e12, ).RowsWillBeClosed(). @@ -176,6 +281,7 @@ func TestQuerySamples(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -251,7 +357,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -262,6 +368,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -286,6 +393,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -326,9 +434,9 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[0].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"124\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"0.100000ms\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"124\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"0.100000ms\" traceparent=\"\"", lokiEntries[1].Line) }) t.Run("wait event with NULL timer_wait is skipped", func(t *testing.T) { @@ -350,7 +458,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -361,6 +469,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -385,6 +494,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -427,7 +537,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() require.Len(t, lokiEntries, 1) assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[0].Line) }) t.Run("query sample and multiple wait events are collected", func(t *testing.T) { @@ -449,7 +559,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -460,6 +570,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -484,6 +595,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -508,6 +620,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -532,6 +645,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -556,6 +670,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -596,15 +711,15 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[0].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000150ms\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000150ms\" traceparent=\"\"", lokiEntries[1].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[2].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"126\" wait_end_event_id=\"126\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"categories\" wait_object_type=\"TABLE\" wait_time=\"0.000350ms\"", lokiEntries[2].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"126\" wait_end_event_id=\"126\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"categories\" wait_object_type=\"TABLE\" wait_time=\"0.000350ms\" traceparent=\"\"", lokiEntries[2].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[3].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"127\" wait_end_event_id=\"127\" wait_event_name=\"wait/io/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000500ms\"", lokiEntries[3].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"127\" wait_end_event_id=\"127\" wait_event_name=\"wait/io/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000500ms\" traceparent=\"\"", lokiEntries[3].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[4].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"128\" wait_end_event_id=\"128\" wait_event_name=\"wait/io/table/sql/handler\" wait_object_name=\"categories\" wait_object_type=\"TABLE\" wait_time=\"0.000700ms\"", lokiEntries[4].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"128\" wait_end_event_id=\"128\" wait_event_name=\"wait/io/table/sql/handler\" wait_object_name=\"categories\" wait_object_type=\"TABLE\" wait_time=\"0.000700ms\" traceparent=\"\"", lokiEntries[4].Line) }) t.Run("query sample and its wait event and another query sample are collected", func(t *testing.T) { @@ -626,7 +741,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -637,6 +752,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -661,6 +777,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -685,6 +802,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "126", "234", "another_digest", + nil, "70000000", "20000000", "5", @@ -725,11 +843,11 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[0].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000150ms\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/lock/table/sql/handler\" wait_object_name=\"books\" wait_object_type=\"TABLE\" wait_time=\"0.000150ms\" traceparent=\"\"", lokiEntries[1].Line) assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[2].Labels) - assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"126\" end_event_id=\"234\" digest=\"another_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[2].Line) + assert.Equal(t, "level=\"info\" schema=\"books_store\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"126\" end_event_id=\"234\" digest=\"another_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[2].Line) }) t.Run("wait event with disabled sql redaction", func(t *testing.T) { @@ -768,7 +886,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, "", exclusionClause, sqlTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 1e12, ).RowsWillBeClosed(). @@ -779,6 +897,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -796,13 +915,13 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", - "statements.SQL_TEXT", }).AddRow( "some_schema", "890", "123", "234", "some_digest", + "select * from some_table where id = 1", "70000000", "20000000", "5", @@ -820,7 +939,6 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "10000000", "456", "457", - "select * from some_table where id = 1", ), ) @@ -843,9 +961,9 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[0].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[0].Line) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"0.100000ms\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"125\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"0.100000ms\" traceparent=\"\"", lokiEntries[1].Line) }) t.Run("wait event below wait_min_duration is filtered by SQL", func(t *testing.T) { @@ -868,7 +986,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "AND waits.timer_wait >= 1000000000", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "AND waits.timer_wait >= 1000000000", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -879,6 +997,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -903,6 +1022,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -966,7 +1086,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "AND waits.timer_wait >= 1000000000", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "AND waits.timer_wait >= 1000000000", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -977,6 +1097,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1001,6 +1122,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -1042,7 +1164,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { lokiEntries := lokiClient.Received() assert.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) assert.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, lokiEntries[1].Labels) - assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"124\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"1.000000ms\"", lokiEntries[1].Line) + assert.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" digest=\"some_digest\" event_id=\"123\" wait_event_id=\"124\" wait_end_event_id=\"124\" wait_event_name=\"wait/io/file/innodb/innodb_data_file\" wait_object_name=\"wait_object_name\" wait_object_type=\"wait_object_type\" wait_time=\"1.000000ms\" traceparent=\"\"", lokiEntries[1].Line) }) } @@ -1067,7 +1189,7 @@ func TestQuerySamples_SampleMinDuration(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "AND statements.TIMER_WAIT >= 1000000000", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "AND statements.TIMER_WAIT >= 1000000000", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -1078,6 +1200,7 @@ func TestQuerySamples_SampleMinDuration(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1137,7 +1260,7 @@ func TestQuerySamples_SampleMinDuration(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "AND statements.TIMER_WAIT >= 1000000000", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "AND statements.TIMER_WAIT >= 1000000000", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -1148,6 +1271,7 @@ func TestQuerySamples_SampleMinDuration(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1171,6 +1295,7 @@ func TestQuerySamples_SampleMinDuration(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "1000000000", // 1ms in picoseconds "5", @@ -1253,7 +1378,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, "", exclusionClause, sqlTextNotNullClause, "", endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -1261,6 +1386,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1278,13 +1404,13 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { "statements.CPU_TIME", "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", - "statements.SQL_TEXT", }).AddRow( "some_schema", "890", "123", "234", "some_digest", + "select * from some_table where id = 1", "70000000", "20000000", "5", @@ -1302,7 +1428,6 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { "10000000", "456", "457", - "select * from some_table where id = 1", ), ) @@ -1325,7 +1450,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\" sql_text=\"select * from some_table where id = 1\"", lokiEntries[0].Line) }) t.Run("does not collect sql text when disabled", func(t *testing.T) { @@ -1365,7 +1490,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -1373,6 +1498,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1396,6 +1522,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -1435,7 +1562,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[0].Line) }) } @@ -1460,6 +1587,7 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1475,13 +1603,14 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "threads.PROCESSLIST_USER", "threads.PROCESSLIST_HOST", }, - expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="0b" max_total_memory="0b" cpu_time="0.000000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, + expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="0b" max_total_memory="0b" cpu_time="0.000000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms" traceparent=""`, scanValues: []driver.Value{ "test_schema", "890", "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -1508,6 +1637,7 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1524,13 +1654,14 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "threads.PROCESSLIST_HOST", "statements.CPU_TIME", }, - expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="0b" max_total_memory="0b" cpu_time="0.010000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, + expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="0b" max_total_memory="0b" cpu_time="0.010000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms" traceparent=""`, scanValues: []driver.Value{ "test_schema", "890", "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -1558,6 +1689,7 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1576,13 +1708,14 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { "statements.MAX_CONTROLLED_MEMORY", "statements.MAX_TOTAL_MEMORY", }, - expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="1024b" max_total_memory="2048b" cpu_time="0.010000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms"`, + expectedLogOutput: `level="info" schema="test_schema" user="some_user" client_host="some_host" thread_id="890" event_id="123" end_event_id="234" digest="some_digest" rows_examined="5" rows_sent="5" rows_affected="0" errors="0" max_controlled_memory="1024b" max_total_memory="2048b" cpu_time="0.010000ms" elapsed_time="0.020000ms" elapsed_time_ms="0.020000ms" traceparent=""`, scanValues: []driver.Value{ "test_schema", "890", "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -1642,7 +1775,7 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, tc.expectedFields, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, tc.expectedFields, "", exclusionClause, "", endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows(tc.expectedColumns).AddRow(tc.scanValues...), ) @@ -1710,7 +1843,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -1730,7 +1863,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -1741,6 +1874,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1764,6 +1898,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -1803,7 +1938,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[0].Line) }) t.Run("result set iteration error", func(t *testing.T) { @@ -1842,7 +1977,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs(1e12, 2e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs(1e12, 2e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -1850,6 +1985,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -1873,6 +2009,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -1896,6 +2033,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "124", "235", "some_digest", + nil, "70000000", "20000000", "5", @@ -1935,7 +2073,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[0].Line) }) t.Run("connection error recovery", func(t *testing.T) { @@ -1974,7 +2112,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 2e12, ).WillReturnError(fmt.Errorf("connection error")) @@ -1988,7 +2126,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 2e12, ).RowsWillBeClosed(). @@ -1999,6 +2137,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -2022,6 +2161,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -2061,7 +2201,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, lokiEntries[0].Labels) - require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\"", lokiEntries[0].Line) + require.Equal(t, "level=\"info\" schema=\"some_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some_digest\" rows_examined=\"5\" rows_sent=\"5\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"456b\" max_total_memory=\"457b\" cpu_time=\"0.010000ms\" elapsed_time=\"0.020000ms\" elapsed_time_ms=\"0.020000ms\" traceparent=\"\"", lokiEntries[0].Line) }) } @@ -2120,7 +2260,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 5, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 5e12, // uptime of 5 seconds in picoseconds (modulo 0 overflows) ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2129,6 +2269,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -2153,6 +2294,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 123, // EVENT_ID 234, // END_EVENT_ID "some digest", // digest + nil, // SQL_TEXT 2e12, // timer_end 2e12, // timer_wait 1000, // rows_examined @@ -2197,7 +2339,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { assert.Equal(t, model.LabelSet{ "op": OP_QUERY_SAMPLE, }, lokiClient.Received()[0].Labels) - assert.Equal(t, "level=\"info\" schema=\"test_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some digest\" rows_examined=\"1000\" rows_sent=\"100\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"1048576b\" max_total_memory=\"2097152b\" cpu_time=\"0.000556ms\" elapsed_time=\"2000.000000ms\" elapsed_time_ms=\"2000.000000ms\"", lokiClient.Received()[0].Line) + assert.Equal(t, "level=\"info\" schema=\"test_schema\" user=\"some_user\" client_host=\"some_host\" thread_id=\"890\" event_id=\"123\" end_event_id=\"234\" digest=\"some digest\" rows_examined=\"1000\" rows_sent=\"100\" rows_affected=\"0\" errors=\"0\" max_controlled_memory=\"1048576b\" max_total_memory=\"2097152b\" cpu_time=\"0.000556ms\" elapsed_time=\"2000.000000ms\" elapsed_time_ms=\"2000.000000ms\" traceparent=\"\"", lokiClient.Received()[0].Line) }) t.Run("asserts that expected query text is used in the constants", func(t *testing.T) { @@ -2218,7 +2360,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 5, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 5e12, // uptime of 5 seconds in picoseconds (modulo 0 overflows) ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2227,6 +2369,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "event_id", "end_event_id", "digest", + "sql_text", "timer_end", "timer_wait", "rows_examined", @@ -2271,7 +2414,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", beginningAndEndOfTimeline)).WithArgs( // asserts that beginningAndEndOfTimeline clause is used + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", beginningAndEndOfTimeline)).WithArgs( // asserts that beginningAndEndOfTimeline clause is used 3e12, 10e12, // uptimeLimit is calculated as uptime "modulo" overflows: (uptime - 1 overflow) in picoseconds ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2280,6 +2423,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "event_id", "end_event_id", "digest", + "sql_text", "timer_end", "timer_wait", "rows_examined", @@ -2323,7 +2467,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", beginningAndEndOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", beginningAndEndOfTimeline)).WithArgs( 3e12, 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2333,6 +2477,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "end_event_id", "current_schema", "digest", + "sql_text", "timer_wait", "rows_examined", "rows_sent", @@ -2366,7 +2511,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+13, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( // asserts revert to endOfTimeline clause + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( // asserts revert to endOfTimeline clause 10e12, // asserts timerBookmark has been updated to the previous uptimeLimit 13e12, // asserts uptimeLimit is now updated to the current uptime "modulo" overflows ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2375,6 +2520,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "thread_id", "current_schema", "digest", + "sql_text", "timer_wait", "rows_examined", "rows_sent", @@ -2410,7 +2556,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( float64(0), 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2419,6 +2565,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "thread_id", "current_schema", "digest", + "sql_text", "timer_wait", "rows_examined", "rows_sent", @@ -2456,7 +2603,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { require.NoError(t, err) defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnError(fmt.Errorf("some error")) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( float64(0), 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2465,6 +2612,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { "thread_id", "current_schema", "digest", + "sql_text", "timer_wait", "rows_examined", "rows_sent", @@ -2516,7 +2664,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(picosecondsToSeconds(math.MaxUint64)+15, 10)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, "", "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs(3e12, 10e12).WillReturnError(fmt.Errorf("some error")) + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, "", "", exclusionClause, "", endOfTimeline)).WithArgs(3e12, 10e12).WillReturnError(fmt.Errorf("some error")) c := &QuerySamples{ dbConnection: db, @@ -2533,12 +2681,13 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { require.NoError(t, err) defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(picosecondsToSeconds(math.MaxUint64)+15, 10)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 2e12, 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ "current_schema", "digest", + "sql_text", "timer_end", "timer_wait", "rows_examined", @@ -2560,6 +2709,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { AddRow( "test_schema", // current_schema "some digest", // digest + nil, // sql_text 2e12, // timer_end 2e12, // timer_wait 1000, // rows_examined @@ -2643,6 +2793,82 @@ func TestQuerySamples_calculateTimerClauseAndLimit(t *testing.T) { } } +func Test_TryExtractTraceParent(t *testing.T) { + testCases := []struct { + name string + input string + expected string + }{ + { + name: "valid traceparent with single quotes", + input: "SELECT * FROM users /*traceparent='00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01'*/", + expected: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + }, + { + name: "valid traceparent with double quotes", + input: `SELECT * FROM users /*traceparent="00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"*/`, + expected: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + }, + { + name: "valid traceparent without quotes", + input: "SELECT * FROM users /*traceparent=00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01*/", + expected: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + }, + { + name: "traceparent with mixed case keyword", + input: "SELECT * FROM users /*TraceParent='00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01'*/", + expected: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + }, + { + name: "traceparent among other comment fields", + input: "SELECT * FROM users /*controller='index',traceparent='00-abc123-def456-01',framework='django'*/", + expected: "00-abc123-def456-01", + }, + { + name: "no traceparent in SQL", + input: "SELECT * FROM users WHERE id = 1", + expected: "", + }, + { + name: "truncated SQL ending with ...", + input: "SELECT * FROM users WHERE id = 1 /*traceparent='00-abc...", + expected: "", + }, + { + name: "traceparent without closing quote", + input: "SELECT * FROM users /*traceparent='00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + expected: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + }, + { + name: "empty traceparent value", + input: "SELECT * FROM users /*traceparent=''*/", + expected: "", + }, + { + name: "traceparent with whitespace", + input: "SELECT * FROM users /*traceparent=' 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 '*/", + expected: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", + }, + { + name: "multiple traceparent occurrences - last one wins", + input: "SELECT * FROM users /*traceparent='00-first-first-01'*/ /*traceparent='00-second-second-02'*/", + expected: "00-second-second-02", + }, + { + name: "empty string", + input: "", + expected: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := tryExtractTraceParent(tc.input) + assert.Equal(t, tc.expected, result) + }) + } +} + func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { defer goleak.VerifyNone(t) @@ -2687,7 +2913,7 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, digestTextNotNullClause, "", endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", exclusionClause, "", endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -2698,6 +2924,7 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { "statements.EVENT_ID", "statements.END_EVENT_ID", "statements.DIGEST", + "statements.SQL_TEXT", "statements.TIMER_END", "statements.TIMER_WAIT", "statements.ROWS_EXAMINED", @@ -2721,6 +2948,7 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { "123", "234", "some_digest", + nil, "70000000", "20000000", "5", @@ -2838,9 +3066,9 @@ func TestQuerySamplesExcludeSchemas(t *testing.T) { // Verify the query uses the custom exclusion clause customClause := buildExcludedSchemasClause([]string{"excluded_schema"}) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", customClause, digestTextNotNullClause, "", endOfTimeline)). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, "", customClause, "", endOfTimeline)). WithArgs(1e12, 1e12).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ - "current_schema", "thread_id", "event_id", "end_event_id", "digest", + "current_schema", "thread_id", "event_id", "end_event_id", "digest", "sql_text", "timer_end", "timer_wait", "rows_examined", "rows_sent", "rows_affected", "errors", "object_schema", "object_name", "object_type", "index_name", "lock_time", "digest_text", "threads.PROCESSLIST_USER", "threads.PROCESSLIST_HOST", "cpu_time", "max_controlled_memory", "max_total_memory",