Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat : Query Performance monitoring tech debt #199

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions postgresql-config.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ integrations:
# True if SSL is to be used. Defaults to false.
ENABLE_SSL: "false"

# Enable collection of only detailed query performance metrics, excluding other metrics. - Defaults to false
QUERY_MONITORING_ONLY : "false"

# Enable query performance monitoring - Defaults to false
# ENABLE_QUERY_MONITORING : "false"

Expand Down
1 change: 1 addition & 0 deletions src/args/argument_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ArgumentList struct {
CollectBloatMetrics bool `default:"true" help:"Enable collecting bloat metrics which can be performance intensive"`
ShowVersion bool `default:"false" help:"Print build information and exit"`
EnableQueryMonitoring bool `default:"false" help:"Enable collection of detailed query performance metrics."`
QueryMonitoringOnly bool `default:"false" help:"Enable collection of only detailed query performance metrics, excluding other metrics."`
QueryMonitoringResponseTimeThreshold int `default:"500" help:"Threshold in milliseconds for query response time. If response time for the individual query exceeds this threshold, the individual query is reported in metrics"`
QueryMonitoringCountThreshold int `default:"20" help:"The number of records for each query performance metrics"`
}
Expand Down
5 changes: 5 additions & 0 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func main() {
os.Exit(1)
}

if args.QueryMonitoringOnly {
queryperformancemonitoring.QueryPerformanceMain(args, pgIntegration, collectionList)
return
}

if args.HasMetrics() {
metrics.PopulateMetrics(connectionInfo, collectionList, instance, pgIntegration, args.Pgbouncer, args.CollectDbLockMetrics, args.CollectBloatMetrics, args.CustomMetricsQuery)
if args.CustomMetricsConfig != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,31 @@ package commonutils

import (
"fmt"
"reflect"

commonparameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters"

"github.com/newrelic/infra-integrations-sdk/v3/data/metric"
"github.com/newrelic/infra-integrations-sdk/v3/integration"
"github.com/newrelic/infra-integrations-sdk/v3/log"
)

func SetMetric(metricSet *metric.Set, name string, value interface{}, sourceType string) {
switch sourceType {
case `gauge`:
err := metricSet.SetMetric(name, value, metric.GAUGE)
if err != nil {
log.Error("Error setting metric: %v", err)
return
}
case `attribute`:
err := metricSet.SetMetric(name, value, metric.ATTRIBUTE)
if err != nil {
log.Error("Error setting metric: %v", err)
return
}
default:
err := metricSet.SetMetric(name, value, metric.GAUGE)
if err != nil {
log.Error("Error setting metric: %v", err)
return
}
}
}

// IngestMetric is a util by which we publish data in batches .Reason for this is to avoid publishing large data in one go and its a limitation for NewRelic.
func IngestMetric(metricList []interface{}, eventName string, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unit tests for testing when this func errors out are missing.

instanceEntity, err := CreateEntity(pgIntegration, cp)
if err != nil {
log.Error("Error creating entity: %v", err)
return err
}

metricCount := 0

for _, model := range metricList {
if model == nil {
continue
}
metricCount += 1
metricSet := instanceEntity.NewMetricSet(eventName)

processErr := ProcessModel(model, metricSet)
if processErr != nil {
log.Error("Error processing model: %v", processErr)
marshalErr := metricSet.MarshalMetrics(model)
if marshalErr != nil {
log.Error("Error processing model: %v", marshalErr)
continue
}

if metricCount == PublishThreshold {
metricCount = 0
if err := PublishMetrics(pgIntegration, &instanceEntity, cp); err != nil {
Expand All @@ -78,40 +48,9 @@ func CreateEntity(pgIntegration *integration.Integration, cp *commonparameters.C
return pgIntegration.Entity(fmt.Sprintf("%s:%s", cp.Host, cp.Port), "pg-instance")
}

func ProcessModel(model interface{}, metricSet *metric.Set) error {
modelValue := reflect.ValueOf(model)
if modelValue.Kind() == reflect.Ptr {
modelValue = modelValue.Elem()
}
if !modelValue.IsValid() || modelValue.Kind() != reflect.Struct {
log.Error("Invalid model type: %v", modelValue.Kind())
return ErrInvalidModelType
}

modelType := reflect.TypeOf(model)

for i := 0; i < modelValue.NumField(); i++ {
field := modelValue.Field(i)
fieldType := modelType.Field(i)
metricName := fieldType.Tag.Get("metric_name")
sourceType := fieldType.Tag.Get("source_type")
ingestData := fieldType.Tag.Get("ingest_data")

if ingestData == "false" {
continue
}

if field.Kind() == reflect.Ptr && !field.IsNil() {
SetMetric(metricSet, metricName, field.Elem().Interface(), sourceType)
} else if field.Kind() != reflect.Ptr {
SetMetric(metricSet, metricName, field.Interface(), sourceType)
}
}
return nil
}

func PublishMetrics(pgIntegration *integration.Integration, instanceEntity **integration.Entity, cp *commonparameters.CommonParameters) error {
if err := pgIntegration.Publish(); err != nil {
log.Error("Error publishing query performance metrics")
return err
}
var err error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,21 @@ package commonutils_test
import (
"testing"

common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters"
commonparameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters"

"github.com/newrelic/infra-integrations-sdk/v3/integration"
"github.com/newrelic/nri-postgresql/src/args"
commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils"
"github.com/stretchr/testify/assert"
)

func TestSetMetric(t *testing.T) {
pgIntegration, _ := integration.New("test", "1.0.0")
entity, _ := pgIntegration.Entity("test-entity", "test-type")
metricSet := entity.NewMetricSet("test-event")
commonutils.SetMetric(metricSet, "testGauge", 123.0, "gauge")
assert.Equal(t, 123.0, metricSet.Metrics["testGauge"])
commonutils.SetMetric(metricSet, "testAttribute", "value", "attribute")
assert.Equal(t, "value", metricSet.Metrics["testAttribute"])
commonutils.SetMetric(metricSet, "testDefault", 456.0, "unknown")
assert.Equal(t, 456.0, metricSet.Metrics["testDefault"])
}

func TestIngestMetric(t *testing.T) {
pgIntegration, _ := integration.New("test", "1.0.0")
args := args.ArgumentList{
Hostname: "localhost",
Port: "5432",
}
cp := common_parameters.SetCommonParameters(args, uint64(14), "testdb")
cp := commonparameters.SetCommonParameters(args, uint64(14), "testdb")
metricList := []interface{}{
struct {
TestField int `metric_name:"testField" source_type:"gauge"`
Expand All @@ -49,36 +37,21 @@ func TestCreateEntity(t *testing.T) {
Hostname: "localhost",
Port: "5432",
}
cp := common_parameters.SetCommonParameters(args, uint64(14), "testdb")
cp := commonparameters.SetCommonParameters(args, uint64(14), "testdb")

entity, err := commonutils.CreateEntity(pgIntegration, cp)
assert.NoError(t, err)
assert.NotNil(t, entity)
assert.Equal(t, "localhost:5432", entity.Metadata.Name)
}

func TestProcessModel(t *testing.T) {
pgIntegration, _ := integration.New("test", "1.0.0")
entity, _ := pgIntegration.Entity("test-entity", "test-type")

metricSet := entity.NewMetricSet("test-event")

model := struct {
TestField int `metric_name:"testField" source_type:"gauge"`
}{TestField: 123}

err := commonutils.ProcessModel(model, metricSet)
assert.NoError(t, err)
assert.Equal(t, 123.0, metricSet.Metrics["testField"])
}

func TestPublishMetrics(t *testing.T) {
pgIntegration, _ := integration.New("test", "1.0.0")
args := args.ArgumentList{
Hostname: "localhost",
Port: "5432",
}
cp := common_parameters.SetCommonParameters(args, uint64(14), "testdb")
cp := commonparameters.SetCommonParameters(args, uint64(14), "testdb")
entity, _ := commonutils.CreateEntity(pgIntegration, cp)

err := commonutils.PublishMetrics(pgIntegration, &entity, cp)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package datamodels

type SlowRunningQueryMetrics struct {
Newrelic *string `db:"newrelic" metric_name:"newrelic" source_type:"attribute" ingest_data:"false"`
QueryID *string `db:"query_id" metric_name:"query_id" source_type:"attribute"`
QueryText *string `db:"query_text" metric_name:"query_text" source_type:"attribute"`
DatabaseName *string `db:"database_name" metric_name:"database_name" source_type:"attribute"`
Expand All @@ -23,7 +22,6 @@ type WaitEventMetrics struct {
DatabaseName *string `db:"database_name" metric_name:"database_name" source_type:"attribute"`
}
type BlockingSessionMetrics struct {
Newrelic *string `db:"newrelic" metric_name:"newrelic" source_type:"attribute" ingest_data:"false"`
BlockedPid *int64 `db:"blocked_pid" metric_name:"blocked_pid" source_type:"gauge"`
BlockedQuery *string `db:"blocked_query" metric_name:"blocked_query" source_type:"attribute"`
BlockedQueryID *string `db:"blocked_query_id" metric_name:"blocked_query_id" source_type:"attribute"`
Expand All @@ -36,14 +34,19 @@ type BlockingSessionMetrics struct {
}

type IndividualQueryMetrics struct {
QueryText *string `json:"query" db:"query" metric_name:"query_text" source_type:"attribute"`
QueryID *string `json:"queryid" db:"queryid" metric_name:"query_id" source_type:"attribute"`
DatabaseName *string `json:"datname" db:"datname" metric_name:"database_name" source_type:"attribute"`
AvgCPUTimeInMS *float64 `json:"cpu_time_ms" db:"cpu_time_ms" metric_name:"cpu_time_ms" source_type:"gauge"`
PlanID *string `json:"planid" db:"planid" metric_name:"plan_id" source_type:"attribute"`
RealQueryText *string `ingest_data:"false"`
AvgExecTimeInMs *float64 `json:"exec_time_ms" db:"exec_time_ms" metric_name:"exec_time_ms" source_type:"gauge"`
Newrelic *string `db:"newrelic" metric_name:"newrelic" source_type:"attribute" ingest_data:"false"`
QueryText *string `json:"query" db:"query" metric_name:"query_text" source_type:"attribute"`
QueryID *string `json:"queryid" db:"queryid" metric_name:"query_id" source_type:"attribute"`
DatabaseName *string `json:"datname" db:"datname" metric_name:"database_name" source_type:"attribute"`
CPUTimeInMS *float64 `json:"cpu_time_ms" db:"cpu_time_ms" metric_name:"cpu_time_ms" source_type:"gauge"`
PlanID *string `json:"planid" db:"planid" metric_name:"plan_id" source_type:"attribute"`
ExecTimeInMs *float64 `json:"exec_time_ms" db:"exec_time_ms" metric_name:"exec_time_ms" source_type:"gauge"`
}

type IndividualQueryInfo struct {
QueryID *string
DatabaseName *string
PlanID *string
RealQueryText *string
}

type QueryExecutionPlanMetrics struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ import (
func PopulateBlockingMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool) {
isEligible, enableCheckError := validations.CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, cp.Version)
if enableCheckError != nil {
log.Error("Error executing query: %v in PopulateBlockingMetrics", enableCheckError)
log.Error("Error executing query for eligibility check in PopulateBlockingMetrics: %v", enableCheckError)
return
}
if !isEligible {
log.Debug("Extension 'pg_stat_statements' is not enabled or unsupported version.")
return
}
blockingQueriesMetricsList, blockQueryFetchErr := getBlockingMetrics(conn, cp)
if blockQueryFetchErr != nil {
log.Error("Error fetching Blocking queries: %v", blockQueryFetchErr)
blockingQueriesMetricsList, blockQueryMetricsFetchErr := getBlockingMetrics(conn, cp)
if blockQueryMetricsFetchErr != nil {
log.Error("Error fetching blocking queries: %v", blockQueryMetricsFetchErr)
return
}
if len(blockingQueriesMetricsList) == 0 {
Expand All @@ -35,37 +35,33 @@ func PopulateBlockingMetrics(conn *performancedbconnection.PGSQLConnection, pgIn
}
err := commonutils.IngestMetric(blockingQueriesMetricsList, "PostgresBlockingSessions", pgIntegration, cp)
if err != nil {
log.Error("Error ingesting Blocking queries: %v", err)
log.Error("Error ingesting blocking queries: %v", err)
return
}
log.Debug("Successfully ingested blocking metrics ")
}

func getBlockingMetrics(conn *performancedbconnection.PGSQLConnection, cp *commonparameters.CommonParameters) ([]interface{}, error) {
var blockingQueriesMetricsList []interface{}
versionSpecificBlockingQuery, err := commonutils.FetchVersionSpecificBlockingQueries(cp.Version)
if err != nil {
log.Error("Unsupported postgres version: %v", err)
return nil, err
}
var query = fmt.Sprintf(versionSpecificBlockingQuery, cp.Databases, cp.QueryMonitoringCountThreshold)
rows, err := conn.Queryx(query)
query := fmt.Sprintf(versionSpecificBlockingQuery, cp.Databases, cp.QueryMonitoringCountThreshold)
blockingQueriesMetricsList, _, err := fetchMetrics[datamodels.BlockingSessionMetrics](conn, query, "Blocking Query")
if err != nil {
log.Error("Failed to execute query: %v", err)
log.Error("Error fetching blocking queries: %v", err)
return nil, commonutils.ErrUnExpectedError
}
defer rows.Close()
for rows.Next() {
var blockingQueryMetric datamodels.BlockingSessionMetrics
if scanError := rows.StructScan(&blockingQueryMetric); scanError != nil {
return nil, scanError
if cp.Version == commonutils.PostgresVersion13 || cp.Version == commonutils.PostgresVersion12 {
for i := range blockingQueriesMetricsList {
*blockingQueriesMetricsList[i].BlockedQuery = commonutils.AnonymizeQueryText(*blockingQueriesMetricsList[i].BlockedQuery)
*blockingQueriesMetricsList[i].BlockingQuery = commonutils.AnonymizeQueryText(*blockingQueriesMetricsList[i].BlockingQuery)
}
// For PostgreSQL versions 13 and 12, anonymization of queries does not occur for blocking sessions, so it's necessary to explicitly anonymize them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could add this comment back as it would be helpful while refering in future.

if cp.Version == commonutils.PostgresVersion13 || cp.Version == commonutils.PostgresVersion12 {
*blockingQueryMetric.BlockedQuery = commonutils.AnonymizeQueryText(*blockingQueryMetric.BlockedQuery)
*blockingQueryMetric.BlockingQuery = commonutils.AnonymizeQueryText(*blockingQueryMetric.BlockingQuery)
}
blockingQueriesMetricsList = append(blockingQueriesMetricsList, blockingQueryMetric)
}

return blockingQueriesMetricsList, nil
var blockingQueriesMetricsListInterface = make([]interface{}, 0)
for _, metric := range blockingQueriesMetricsList {
blockingQueriesMetricsListInterface = append(blockingQueriesMetricsListInterface, metric)
}
return blockingQueriesMetricsListInterface, nil
}
Loading
Loading