Skip to content

Commit

Permalink
use two separate queries and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cristiangreco committed Feb 14, 2025
1 parent 22c7a26 commit f2955ea
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 23 deletions.
3 changes: 3 additions & 0 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func readMetric(m prometheus.Metric) MetricResult {
if pb.Counter != nil {
return MetricResult{labels: labels, value: pb.GetCounter().GetValue(), metricType: dto.MetricType_COUNTER}
}
if pb.Summary != nil {
return MetricResult{labels: labels, value: pb.GetSummary().GetSampleSum(), metricType: dto.MetricType_SUMMARY}
}
if pb.Untyped != nil {
return MetricResult{labels: labels, value: pb.GetUntyped().GetValue(), metricType: dto.MetricType_UNTYPED}
}
Expand Down
85 changes: 62 additions & 23 deletions collector/perf_schema_events_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package collector

import (
"context"
"fmt"
"log/slog"

"github.com/alecthomas/kingpin/v2"
Expand Down Expand Up @@ -45,7 +44,6 @@ const perfEventsStatementsQuery = `
QUANTILE_95,
QUANTILE_99,
QUANTILE_999
%s
FROM (
SELECT *
FROM performance_schema.events_statements_summary_by_digest
Expand All @@ -72,7 +70,60 @@ const perfEventsStatementsQuery = `
Q.QUANTILE_95,
Q.QUANTILE_99,
Q.QUANTILE_999
%s
ORDER BY SUM_TIMER_WAIT DESC
LIMIT %d
`

const perfEventsStatementsQueryMySQL = `
SELECT
ifnull(SCHEMA_NAME, 'NONE') as SCHEMA_NAME,
DIGEST,
LEFT(DIGEST_TEXT, %d) as DIGEST_TEXT,
COUNT_STAR,
SUM_TIMER_WAIT,
SUM_LOCK_TIME,
SUM_CPU_TIME,
SUM_ERRORS,
SUM_WARNINGS,
SUM_ROWS_AFFECTED,
SUM_ROWS_SENT,
SUM_ROWS_EXAMINED,
SUM_CREATED_TMP_DISK_TABLES,
SUM_CREATED_TMP_TABLES,
SUM_SORT_MERGE_PASSES,
SUM_SORT_ROWS,
SUM_NO_INDEX_USED,
QUANTILE_95,
QUANTILE_99,
QUANTILE_999
FROM (
SELECT *
FROM performance_schema.events_statements_summary_by_digest
WHERE SCHEMA_NAME NOT IN ('mysql', 'performance_schema', 'information_schema')
AND LAST_SEEN > DATE_SUB(NOW(), INTERVAL %d SECOND)
ORDER BY LAST_SEEN DESC
)Q
GROUP BY
Q.SCHEMA_NAME,
Q.DIGEST,
Q.DIGEST_TEXT,
Q.COUNT_STAR,
Q.SUM_TIMER_WAIT,
Q.SUM_LOCK_TIME,
Q.SUM_CPU_TIME,
Q.SUM_ERRORS,
Q.SUM_WARNINGS,
Q.SUM_ROWS_AFFECTED,
Q.SUM_ROWS_SENT,
Q.SUM_ROWS_EXAMINED,
Q.SUM_CREATED_TMP_DISK_TABLES,
Q.SUM_CREATED_TMP_TABLES,
Q.SUM_SORT_MERGE_PASSES,
Q.SUM_SORT_ROWS,
Q.SUM_NO_INDEX_USED,
Q.QUANTILE_95,
Q.QUANTILE_99,
Q.QUANTILE_999
ORDER BY SUM_TIMER_WAIT DESC
LIMIT %d
`
Expand Down Expand Up @@ -192,23 +243,12 @@ func (ScrapePerfEventsStatements) Version() float64 {

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapePerfEventsStatements) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
additionalColumns := ""
additionalGroupBy := ""
useAdditionalColumns := false
if instance.flavor == FlavorMySQL && instance.version.GTE(semver.MustParse("8.0.28")) {
additionalColumns = ", SUM_LOCK_TIME, SUM_CPU_TIME"
additionalGroupBy = ", Q.SUM_LOCK_TIME, Q.SUM_CPU_TIME"
useAdditionalColumns = true
}
mysqlVersion8028 := instance.flavor == FlavorMySQL && instance.version.GTE(semver.MustParse("8.0.28"))

perfQuery := fmt.Sprintf(
perfEventsStatementsQuery,
*perfEventsStatementsDigestTextLimit,
additionalColumns,
*perfEventsStatementsTimeLimit,
additionalGroupBy,
*perfEventsStatementsLimit,
)
perfQuery := perfEventsStatementsQuery
if mysqlVersion8028 {
perfQuery = perfEventsStatementsQueryMySQL
}

db := instance.getDB()
// Timers here are returned in picoseconds.
Expand All @@ -220,20 +260,19 @@ func (ScrapePerfEventsStatements) Scrape(ctx context.Context, instance *instance

var (
schemaName, digest, digestText string
count, queryTime uint64
count, queryTime, lockTime, cpuTime uint64
errors, warnings uint64
rowsAffected, rowsSent, rowsExamined uint64
tmpTables, tmpDiskTables uint64
sortMergePasses, sortRows uint64
noIndexUsed uint64
quantile95, quantile99, quantile999 uint64
lockTime, cpuTime uint64
)
for perfSchemaEventsStatementsRows.Next() {
var err error
if useAdditionalColumns {
if mysqlVersion8028 {
err = perfSchemaEventsStatementsRows.Scan(
&schemaName, &digest, &digestText, &count, &queryTime, &errors, &warnings, &rowsAffected, &rowsSent, &rowsExamined, &tmpDiskTables, &tmpTables, &sortMergePasses, &sortRows, &noIndexUsed, &quantile95, &quantile99, &quantile999, &lockTime, &cpuTime,
&schemaName, &digest, &digestText, &count, &queryTime, &lockTime, &cpuTime, &errors, &warnings, &rowsAffected, &rowsSent, &rowsExamined, &tmpDiskTables, &tmpTables, &sortMergePasses, &sortRows, &noIndexUsed, &quantile95, &quantile99, &quantile999,
)
} else {
err = perfSchemaEventsStatementsRows.Scan(
Expand Down
166 changes: 166 additions & 0 deletions collector/perf_schema_events_statements_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/blang/semver/v4"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/promslog"
"github.com/smartystreets/goconvey/convey"
)

func TestScrapePerfEventsStatements(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()

columns := []string{
"SCHEMA_NAME", "DIGEST", "DIGEST_TEXT",
"COUNT_STAR", "SUM_TIMER_WAIT", "SUM_ERRORS", "SUM_WARNINGS",
"SUM_ROWS_AFFECTED", "SUM_ROWS_SENT", "SUM_ROWS_EXAMINED",
"SUM_CREATED_TMP_DISK_TABLES", "SUM_CREATED_TMP_TABLES", "SUM_SORT_MERGE_PASSES",
"SUM_SORT_ROWS", "SUM_NO_INDEX_USED",
"QUANTILE_95", "QUANTILE_99", "QUANTILE_999",
}

rows := sqlmock.NewRows(columns).
AddRow(
"db1", "digest1", "SELECT * FROM test",
100, 1000, 1, 2,
50, 100, 150,
1, 2, 3,
100, 1,
100, 150, 200)

mock.ExpectQuery(sanitizeQuery(perfEventsStatementsQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapePerfEventsStatements{}).Scrape(context.Background(), &instance{db: db}, ch, promslog.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
}()

expected := []MetricResult{
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 100, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1000 / picoSeconds, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 0, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 2, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 50, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 100, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 150, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 2, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 3, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 100, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1000 / picoSeconds, metricType: dto.MetricType_SUMMARY},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
got := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, got)
}
})

if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}

func TestScrapePerfEventsStatementsMySQL8028(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()

inst := &instance{
db: db,
flavor: FlavorMySQL,
version: semver.MustParse("8.0.28"),
}

columns := []string{
"SCHEMA_NAME", "DIGEST", "DIGEST_TEXT",
"COUNT_STAR", "SUM_TIMER_WAIT",
"SUM_LOCK_TIME", "SUM_CPU_TIME",
"SUM_ERRORS", "SUM_WARNINGS",
"SUM_ROWS_AFFECTED", "SUM_ROWS_SENT", "SUM_ROWS_EXAMINED",
"SUM_CREATED_TMP_DISK_TABLES", "SUM_CREATED_TMP_TABLES", "SUM_SORT_MERGE_PASSES",
"SUM_SORT_ROWS", "SUM_NO_INDEX_USED",
"QUANTILE_95", "QUANTILE_99", "QUANTILE_999",
}

rows := sqlmock.NewRows(columns).
AddRow(
"db1", "digest1", "SELECT * FROM test",
100, 1000,
30, 50,
1, 2,
50, 100, 150,
1, 2, 3,
100, 1,
100, 150, 200)

mock.ExpectQuery(sanitizeQuery(perfEventsStatementsQueryMySQL)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapePerfEventsStatements{}).Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
}()

expected := []MetricResult{
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 100, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1000 / picoSeconds, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 30 / picoSeconds, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 50 / picoSeconds, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 2, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 50, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 100, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 150, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 2, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 3, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 100, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1, metricType: dto.MetricType_COUNTER},
{labels: labelMap{"schema": "db1", "digest": "digest1", "digest_text": "SELECT * FROM test"}, value: 1000 / picoSeconds, metricType: dto.MetricType_SUMMARY},
}

convey.Convey("MySQL 8.0.28+ metrics comparison", t, func() {
for _, expect := range expected {
got := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, got)
}
})

if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
}

0 comments on commit f2955ea

Please sign in to comment.