Skip to content

Commit 5bfa92e

Browse files
authored
Add pg_query fingerprint cache (#768)
1 parent cad8581 commit 5bfa92e

File tree

10 files changed

+117
-11
lines changed

10 files changed

+117
-11
lines changed

input/postgres/backends.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ END
1919
const activitySQL string = `
2020
SELECT (extract(epoch from COALESCE(backend_start, pg_catalog.pg_postmaster_start_time()))::int::text || pg_catalog.to_char(pid, 'FM0000000'))::bigint,
2121
datid, datname, usesysid, usename, pid, application_name, client_addr::text, client_port,
22-
backend_start, xact_start, query_start, state_change, COALESCE(wait_event_type, '') = 'Lock' as waiting, backend_xid, backend_xmin, wait_event_type, wait_event, backend_type, %s, state, query
22+
backend_start, xact_start, query_start, state_change, COALESCE(wait_event_type, '') = 'Lock' as waiting,
23+
backend_xid, backend_xmin, wait_event_type, wait_event, backend_type, %s, state, query, %s
2324
FROM %s
2425
WHERE pid IS NOT NULL`
2526

2627
func GetBackends(ctx context.Context, c *Collection, db *sql.DB) ([]state.PostgresBackend, error) {
2728
var blockingPidsField string
29+
var queryIdField string
2830
var sourceTable string
2931

3032
if c.GlobalOpts.CollectPostgresLocks {
@@ -33,13 +35,19 @@ func GetBackends(ctx context.Context, c *Collection, db *sql.DB) ([]state.Postgr
3335
blockingPidsField = "NULL"
3436
}
3537

38+
if c.PostgresVersion.Numeric >= state.PostgresVersion14 {
39+
queryIdField = "coalesce(query_id, 0)"
40+
} else {
41+
queryIdField = "0"
42+
}
43+
3644
if c.HelperExists("get_stat_activity", nil) {
3745
sourceTable = "pganalyze.get_stat_activity()"
3846
} else {
3947
sourceTable = "pg_catalog.pg_stat_activity"
4048
}
4149

42-
stmt, err := db.PrepareContext(ctx, QueryMarkerSQL+fmt.Sprintf(activitySQL, blockingPidsField, sourceTable))
50+
stmt, err := db.PrepareContext(ctx, QueryMarkerSQL+fmt.Sprintf(activitySQL, blockingPidsField, queryIdField, sourceTable))
4351
if err != nil {
4452
return nil, err
4553
}
@@ -63,7 +71,7 @@ func GetBackends(ctx context.Context, c *Collection, db *sql.DB) ([]state.Postgr
6371
&row.ClientPort, &row.BackendStart, &row.XactStart, &row.QueryStart,
6472
&row.StateChange, &row.Waiting, &row.BackendXid, &row.BackendXmin,
6573
&row.WaitEventType, &row.WaitEvent, &row.BackendType, pq.Array(&row.BlockedByPids),
66-
&row.State, &row.Query)
74+
&row.State, &row.Query, &row.QueryId)
6775
if err != nil {
6876
return nil, err
6977
}

input/postgres/collection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ type Collection struct {
2323

2424
// Information that is specific to the current database we're connected to
2525
HelperFunctions map[string][]state.PostgresFunction
26+
27+
Fingerprints *state.Fingerprints
2628
}
2729

2830
func helpersFromFunctions(functions []state.PostgresFunction) map[string][]state.PostgresFunction {
@@ -81,6 +83,7 @@ func NewCollection(ctx context.Context, logger *util.Logger, server *state.Serve
8183
ConnectedAsSuperUser: connectedAsSuperUser,
8284
ConnectedAsMonitoringRole: connectedAsMonitoringRole,
8385
HelperFunctions: helpersFromFunctions(helperFunctions),
86+
Fingerprints: server.Fingerprints,
8487
}, nil
8588
}
8689

@@ -95,6 +98,7 @@ func (c *Collection) ForCurrentDatabase(functions []state.PostgresFunction) *Col
9598
ConnectedAsSuperUser: c.ConnectedAsSuperUser,
9699
ConnectedAsMonitoringRole: c.ConnectedAsMonitoringRole,
97100
HelperFunctions: helpersFromFunctions(functions),
101+
Fingerprints: c.Fingerprints,
98102
}
99103
}
100104

input/postgres/statements.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func GetStatementTexts(ctx context.Context, c *Collection, db *sql.DB) (state.Po
232232
case <-ctx.Done():
233233
return nil, nil, ctx.Err()
234234
default:
235-
fingerprintAndNormalize(c, key, query, statements, statementTextsByFp, ignoreIoTiming)
235+
fingerprintAndNormalize(c, key, key.QueryID, query, statements, statementTextsByFp, ignoreIoTiming)
236236
}
237237
}
238238

@@ -351,7 +351,7 @@ func ignoreIOTiming(postgresVersion state.PostgresVersion, receivedQuery string)
351351
var collectorQueryFingerprint = util.FingerprintText(util.QueryTextCollector)
352352
var insufficientPrivsQueryFingerprint = util.FingerprintText(util.QueryTextInsufficientPrivs)
353353

354-
func fingerprintAndNormalize(c *Collection, key state.PostgresStatementKey, text string, statements state.PostgresStatementMap, statementTextsByFp state.PostgresStatementTextMap, ignoreIoTiming bool) {
354+
func fingerprintAndNormalize(c *Collection, key state.PostgresStatementKey, queryID int64, text string, statements state.PostgresStatementMap, statementTextsByFp state.PostgresStatementTextMap, ignoreIoTiming bool) {
355355
if insufficientPrivilege(text) {
356356
statements[key] = state.PostgresStatement{
357357
InsufficientPrivilege: true,
@@ -365,7 +365,7 @@ func fingerprintAndNormalize(c *Collection, key state.PostgresStatementKey, text
365365
IgnoreIoTiming: ignoreIoTiming,
366366
}
367367
} else {
368-
fp := util.FingerprintQuery(text, c.Config.FilterQueryText, -1)
368+
fp := c.Fingerprints.LoadOrStore(queryID, text, c.Config.FilterQueryText, -1)
369369
statements[key] = state.PostgresStatement{Fingerprint: fp, IgnoreIoTiming: ignoreIoTiming}
370370
_, ok := statementTextsByFp[fp]
371371
if !ok {

output/transform/activity.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func ActivityStateToCompactActivitySnapshot(server *state.Server, activityState
3535
b.RoleIdx,
3636
b.DatabaseIdx,
3737
backend.Query.String,
38+
backend.QueryId,
3839
activityState.TrackActivityQuerySize,
3940
)
4041
b.HasQueryIdx = true

output/transform/logs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func transformPostgresQuerySamples(server *state.Server, s snapshot.CompactLogSn
4646
roleIdx,
4747
databaseIdx,
4848
sampleIn.Query,
49+
0,
4950
-1,
5051
)
5152

@@ -184,6 +185,7 @@ func transformSystemLogLine(server *state.Server, r *snapshot.CompactSnapshot_Ba
184185
logLine.RoleIdx,
185186
logLine.DatabaseIdx,
186187
logLineIn.Query,
188+
0,
187189
-1,
188190
)
189191
logLine.HasQueryIdx = true

output/transform/util.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,15 @@ func upsertQueryReferenceAndInformation(s *snapshot.FullSnapshot, statementTexts
6161
return idx
6262
}
6363

64-
func upsertQueryReferenceAndInformationSimple(server *state.Server, refs []*snapshot.QueryReference, infos []*snapshot.QueryInformation, roleIdx int32, databaseIdx int32, originalQuery string, trackActivityQuerySize int) (int32, []*snapshot.QueryReference, []*snapshot.QueryInformation) {
65-
fingerprint := util.FingerprintQuery(originalQuery, server.Config.FilterQueryText, trackActivityQuerySize)
64+
func upsertQueryReferenceAndInformationSimple(server *state.Server, refs []*snapshot.QueryReference, infos []*snapshot.QueryInformation, roleIdx int32, databaseIdx int32, originalQuery string, queryID int64, trackActivityQuerySize int) (int32, []*snapshot.QueryReference, []*snapshot.QueryInformation) {
65+
var fingerprint uint64
66+
// When the query ID is missing, always fingerprint the query instead of trying to use the fingerprint cache.
67+
// It's always zero for query samples from logs, but can also be zero from pg_stat_activity in old Postgres versions.
68+
if queryID == 0 {
69+
fingerprint = util.FingerprintQuery(originalQuery, server.Config.FilterQueryText, trackActivityQuerySize)
70+
} else {
71+
fingerprint = server.Fingerprints.LoadOrStore(queryID, originalQuery, server.Config.FilterQueryText, trackActivityQuerySize)
72+
}
6673

6774
fpBuf := make([]byte, 8)
6875
binary.BigEndian.PutUint64(fpBuf, fingerprint)

state/fingerprints.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package state
2+
3+
import (
4+
"github.com/pganalyze/collector/util"
5+
"sync"
6+
"sync/atomic"
7+
)
8+
9+
// This uses around 9 MB per server, as measured by MemStats
10+
const MAX_SIZE = 450_000
11+
12+
type Fingerprints struct {
13+
cache map[int64]uint64
14+
lock sync.RWMutex
15+
size atomic.Int32
16+
}
17+
18+
func NewFingerprints() *Fingerprints {
19+
return &Fingerprints{
20+
cache: make(map[int64]uint64, MAX_SIZE),
21+
lock: sync.RWMutex{},
22+
}
23+
}
24+
25+
func (c *Fingerprints) Load(queryID int64) (uint64, bool) {
26+
c.lock.RLock()
27+
defer c.lock.RUnlock()
28+
fingerprint, exists := c.cache[queryID]
29+
return fingerprint, exists
30+
}
31+
32+
func (c *Fingerprints) LoadOrStore(queryID int64, text string, filterQueryText string, trackActivityQuerySize int) uint64 {
33+
fingerprint, exists := c.Load(queryID)
34+
if exists {
35+
return fingerprint
36+
}
37+
fingerprint, virtual := util.TryFingerprintQuery(text, filterQueryText, trackActivityQuerySize)
38+
if virtual {
39+
// Don't store virtual fingerprints so we can cache real fingerprints later
40+
return fingerprint
41+
}
42+
c.cleanup()
43+
c.lock.Lock()
44+
defer c.lock.Unlock()
45+
c.cache[queryID] = fingerprint
46+
c.size.Add(1)
47+
return fingerprint
48+
}
49+
50+
// Retains a random 50% sample of entries if the cache grows too large
51+
func (c *Fingerprints) cleanup() {
52+
if c.size.Load() < MAX_SIZE {
53+
return
54+
}
55+
c.lock.Lock()
56+
defer c.lock.Unlock()
57+
cache := make(map[int64]uint64, MAX_SIZE)
58+
index := 0
59+
for key, value := range c.cache {
60+
if index%2 == 0 {
61+
cache[key] = value
62+
}
63+
index += 1
64+
}
65+
c.size.Store(int32(len(cache)))
66+
c.cache = cache
67+
}

state/postgres_backend.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ type PostgresBackend struct {
2929

3030
BackendType null.String // 10+ The process type of this backend
3131

32-
Query null.String // Text of this backend's most recent query
32+
Query null.String // Text of this backend's most recent query
33+
QueryId int64
3334

3435
// Current overall state of this backend. Possible values are:
3536
// - active: The backend is executing a query.

state/state.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ type Server struct {
331331
// differences (see https://groups.google.com/g/golang-nuts/c/eIqkhXh9PLg),
332332
// as we access this in high frequency log-related code paths.
333333
LogIgnoreFlags uint32
334+
335+
// Cache of Postgres query_id -> pg_query fingerprint mappings
336+
Fingerprints *Fingerprints
334337
}
335338

336339
func MakeServer(config config.ServerConfig, testRun bool) *Server {
@@ -347,6 +350,7 @@ func MakeServer(config config.ServerConfig, testRun bool) *Server {
347350
QueryRuns: make(map[int64]*QueryRun),
348351
QueryRunsMutex: &sync.Mutex{},
349352
LogParseMutex: &sync.RWMutex{},
353+
Fingerprints: NewFingerprints(),
350354
}
351355
server.Grant.Store(&Grant{Config: pganalyze_collector.ServerMessage_Config{Features: &pganalyze_collector.ServerMessage_Features{}}})
352356
server.Pause.Store(false)

util/fingerprint.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ import (
44
pg_query "github.com/pganalyze/pg_query_go/v6"
55
)
66

7-
// FingerprintQuery - Generates a unique fingerprint for the given query
8-
func FingerprintQuery(query string, filterQueryText string, trackActivityQuerySize int) (fp uint64) {
7+
// TryFingerprintQuery - Generates a unique fingerprint for the given query,
8+
// and whether the query text had to be modified to generate a fingerprint.
9+
//
10+
// This matters for some callers to decide what to do with the fingerprint, for
11+
// example if we see a truncated version of a query in pg_stat_activity, and then
12+
// we see the full query text later via pg_stat_statements.
13+
func TryFingerprintQuery(query string, filterQueryText string, trackActivityQuerySize int) (fp uint64, virtual bool) {
914
fp, err := pg_query.FingerprintToUInt64(query)
1015
if err != nil {
16+
virtual = true
1117
fixedQuery := fixTruncatedQuery(query)
1218

1319
fp, err = pg_query.FingerprintToUInt64(fixedQuery)
@@ -20,6 +26,12 @@ func FingerprintQuery(query string, filterQueryText string, trackActivityQuerySi
2026
return
2127
}
2228

29+
// FingerprintQuery - Generates a unique fingerprint for the given query
30+
func FingerprintQuery(query string, filterQueryText string, trackActivityQuerySize int) (fp uint64) {
31+
fp, _ = TryFingerprintQuery(query, filterQueryText, trackActivityQuerySize)
32+
return
33+
}
34+
2335
// FingerprintText - Generates a fingerprint for static texts (used for error scenarios)
2436
func FingerprintText(query string) (fp uint64) {
2537
return pg_query.HashXXH3_64([]byte(query), 0xee)

0 commit comments

Comments
 (0)