Skip to content

Commit 44a0e9f

Browse files
authored
feat: stats for notifier repo module (#6184)
# Description - This PR adds comprehensive stats emission to all notifier repository operations, providing better observability and monitoring capabilities for database performance across the notifier module ## Linear Ticket - Resolves WAR-922 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 07c0391 commit 44a0e9f

File tree

3 files changed

+72
-6
lines changed

3 files changed

+72
-6
lines changed

services/notifier/notifier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func (n *Notifier) Setup(
227227
if err := n.setupDatabase(ctx, dsn); err != nil {
228228
return fmt.Errorf("could not setup db: %w", err)
229229
}
230-
n.repo = newRepo(n.db)
230+
n.repo = newRepo(n.db, WithStats(n.statsFactory))
231231

232232
groupCtx, groupCancel := context.WithCancel(ctx)
233233
n.background.group, n.background.groupCtx = errgroup.WithContext(groupCtx)

services/notifier/repo.go

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010

1111
"github.com/lib/pq"
1212

13+
"github.com/rudderlabs/rudder-go-kit/stats"
14+
1315
"github.com/rudderlabs/rudder-go-kit/bytesize"
1416
"github.com/rudderlabs/rudder-go-kit/config"
1517

@@ -46,15 +48,23 @@ func WithNow(now func() time.Time) Opt {
4648
}
4749
}
4850

51+
func WithStats(s stats.Stats) Opt {
52+
return func(r *repo) {
53+
r.statsFactory = s
54+
}
55+
}
56+
4957
type repo struct {
50-
db *sqlmw.DB
51-
now func() time.Time
58+
db *sqlmw.DB
59+
now func() time.Time
60+
statsFactory stats.Stats
5261
}
5362

5463
func newRepo(db *sqlmw.DB, opts ...Opt) *repo {
5564
r := &repo{
56-
db: db,
57-
now: timeutil.Now,
65+
db: db,
66+
now: timeutil.Now,
67+
statsFactory: stats.NOP,
5868
}
5969
for _, opt := range opts {
6070
opt(r)
@@ -67,6 +77,10 @@ func (n *repo) resetForWorkspace(
6777
ctx context.Context,
6878
workspaceIdentifier string,
6979
) error {
80+
defer n.timerStat("reset_for_workspace", stats.Tags{
81+
"workspaceIdentifier": workspaceIdentifier,
82+
})()
83+
7084
_, err := n.db.ExecContext(ctx, `
7185
DELETE FROM `+notifierTableName+`
7286
WHERE workspace = $1;
@@ -86,6 +100,11 @@ func (n *repo) insert(
86100
workspaceIdentifier string,
87101
batchID string,
88102
) error {
103+
defer n.timerStat("insert", stats.Tags{
104+
"workspaceIdentifier": workspaceIdentifier,
105+
"jobType": string(publishRequest.JobType),
106+
})()
107+
89108
txn, err := n.db.BeginTx(ctx, &sql.TxOptions{})
90109
if err != nil {
91110
return fmt.Errorf("inserting: begin transaction: %w", err)
@@ -166,6 +185,8 @@ func (n *repo) pendingByBatchID(
166185
ctx context.Context,
167186
batchID string,
168187
) (int64, error) {
188+
defer n.timerStat("pending_by_batch_id", nil)()
189+
169190
var count int64
170191

171192
err := n.db.QueryRowContext(ctx, `
@@ -195,6 +216,8 @@ func (n *repo) getByBatchID(
195216
ctx context.Context,
196217
batchID string,
197218
) ([]Job, error) {
219+
defer n.timerStat("get_by_batch_id", nil)()
220+
198221
query := `
199222
SELECT
200223
id,
@@ -300,6 +323,8 @@ func (n *repo) deleteByBatchID(
300323
ctx context.Context,
301324
batchID string,
302325
) error {
326+
defer n.timerStat("delete_by_batch_id", nil)()
327+
303328
_, err := n.db.ExecContext(ctx, `
304329
DELETE FROM `+notifierTableName+` WHERE batch_id = $1;
305330
`,
@@ -328,6 +353,10 @@ func (n *repo) claim(
328353
ctx context.Context,
329354
workerID string,
330355
) (*Job, error) {
356+
defer n.timerStat("claim", stats.Tags{
357+
"workerID": workerID,
358+
})()
359+
331360
row := n.db.QueryRowContext(ctx, `
332361
UPDATE
333362
`+notifierTableName+`
@@ -376,6 +405,12 @@ func (n *repo) onClaimFailed(
376405
claimError error,
377406
maxAttempt int,
378407
) error {
408+
defer n.timerStat("on_claim_failed", stats.Tags{
409+
"workspaceIdentifier": job.WorkspaceIdentifier,
410+
"jobStatus": string(job.Status),
411+
"jobType": string(job.Type),
412+
})()
413+
379414
query := fmt.Sprint(`
380415
UPDATE
381416
` + notifierTableName + `
@@ -415,6 +450,12 @@ func (n *repo) onClaimSuccess(
415450
job *Job,
416451
payload json.RawMessage,
417452
) error {
453+
defer n.timerStat("on_claim_success", stats.Tags{
454+
"workspaceIdentifier": job.WorkspaceIdentifier,
455+
"jobStatus": string(job.Status),
456+
"jobType": string(job.Type),
457+
})()
458+
418459
_, err := n.db.ExecContext(ctx, `
419460
UPDATE
420461
`+notifierTableName+`
@@ -442,6 +483,8 @@ func (n *repo) orphanJobIDs(
442483
ctx context.Context,
443484
intervalInSeconds int,
444485
) ([]int64, error) {
486+
defer n.timerStat("orphan_job_ids", nil)()
487+
445488
rows, err := n.db.QueryContext(ctx, `
446489
UPDATE
447490
`+notifierTableName+`
@@ -489,6 +532,8 @@ func (n *repo) orphanJobIDs(
489532
}
490533

491534
func (n *repo) refreshClaim(ctx context.Context, jobId int64) error {
535+
defer n.timerStat("refresh_claim", nil)()
536+
492537
_, err := n.db.ExecContext(ctx, `
493538
UPDATE
494539
`+notifierTableName+`
@@ -507,3 +552,13 @@ func (n *repo) refreshClaim(ctx context.Context, jobId int64) error {
507552
}
508553
return nil
509554
}
555+
556+
// timerStat returns a function that records the duration of a database action.
557+
func (n *repo) timerStat(action string, extraTags stats.Tags) func() {
558+
statName := "notifier_repo_query_duration_seconds"
559+
tags := stats.Tags{"action": action, "repoType": notifierTableName}
560+
for k, v := range extraTags {
561+
tags[k] = v
562+
}
563+
return n.statsFactory.NewTaggedStat(statName, stats.TimerType, tags).RecordDuration()
564+
}

services/notifier/repo_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/stretchr/testify/require"
1717

1818
"github.com/rudderlabs/rudder-go-kit/config"
19+
"github.com/rudderlabs/rudder-go-kit/stats"
20+
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
1921
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
2022

2123
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
@@ -57,6 +59,9 @@ func TestRepo(t *testing.T) {
5759
pool, err := dockertest.NewPool("")
5860
require.NoError(t, err)
5961

62+
statsStore, err := memstats.New()
63+
require.NoError(t, err)
64+
6065
pgResource, err := postgres.Setup(pool, t)
6166
require.NoError(t, err)
6267

@@ -80,7 +85,7 @@ func TestRepo(t *testing.T) {
8085
ctx := context.Background()
8186
now := time.Now().Truncate(time.Second).UTC()
8287
db := sqlmw.New(pgResource.DB)
83-
r := newRepo(db, WithNow(func() time.Time {
88+
r := newRepo(db, WithStats(statsStore), WithNow(func() time.Time {
8489
return now
8590
}))
8691

@@ -106,6 +111,12 @@ func TestRepo(t *testing.T) {
106111

107112
err := r.insert(ctx, &publishRequest, workspaceIdentifier, batchID)
108113
require.NoError(t, err)
114+
require.Greater(t, statsStore.Get("notifier_repo_query_duration_seconds", stats.Tags{
115+
"action": "insert",
116+
"repoType": "pg_notifier_queue",
117+
"workspaceIdentifier": workspaceIdentifier,
118+
"jobType": string(publishRequest.JobType),
119+
}).LastDuration(), time.Duration(0))
109120

110121
jobs, err := r.getByBatchID(ctx, batchID)
111122
require.NoError(t, err)

0 commit comments

Comments
 (0)