@@ -3,6 +3,7 @@ package collector
33import (
44 "context"
55 "fmt"
6+ "strings"
67 "testing"
78 "time"
89
@@ -15,42 +16,6 @@ import (
1516 "github.com/grafana/alloy/internal/component/common/loki"
1617)
1718
18- // Helper to generate test log lines with current timestamp to avoid historical filtering
19- func testLogLine (line string ) string {
20- now := time .Now ()
21- // PostgreSQL log_line_prefix format: %m:%r:%u@%d:[%p]:%l:%e:%s:%v:%x:%c:%q%a
22- // %m is the first timestamp (appears at start of line)
23- // %s is the session start time (appears later in the line after several colons)
24- // We need to replace BOTH timestamps with current time
25-
26- // Replace the first timestamp (first 23 chars: "2025-12-12 15:29:16.068")
27- if len (line ) > 23 {
28- nowStr := now .Format ("2006-01-02 15:04:05.000" )
29- result := nowStr + line [23 :]
30-
31- // Find and replace the second timestamp (%s field)
32- // It appears after the SQLSTATE (5 chars) and a colon
33- // Pattern: ...:[SQLSTATE]:YYYY-MM-DD HH:MM:SS[.mmm] TZ:...
34- // Look for the pattern after position 50 to skip the first timestamp
35- if len (result ) > 50 {
36- // Find ":YYYY-MM-DD " pattern for the second timestamp
37- for i := 50 ; i < len (result )- 23 ; i ++ {
38- if result [i ] == ':' && i + 23 < len (result ) {
39- // Check if this looks like a timestamp (YYYY-MM-DD HH:MM:SS)
40- if result [i + 1 :i + 5 ] == "2025" || result [i + 1 :i + 5 ] == "2026" {
41- // Replace just the date/time part, keep the timezone
42- result = result [:i + 1 ] + nowStr + result [i + 1 + 23 :]
43- break
44- }
45- }
46- }
47- }
48- return result
49- }
50- return line
51- }
52-
53-
5419func TestLogsCollector_ParseRDSFormat (t * testing.T ) {
5520 entryHandler := loki .NewEntryHandler (make (chan loki.Entry , 10 ), func () {})
5621 registry := prometheus .NewRegistry ()
@@ -63,10 +28,16 @@ func TestLogsCollector_ParseRDSFormat(t *testing.T) {
6328 })
6429 require .NoError (t , err )
6530
31+ startTime := collector .startTime
6632 err = collector .Start (context .Background ())
6733 require .NoError (t , err )
6834 defer collector .Stop ()
6935
36+ // Build log lines with timestamps after collector start (like SkipsHistoricalLogs)
37+ ts := startTime .Add (10 * time .Second ).UTC ()
38+ ts1 := ts .Format ("2006-01-02 15:04:05.000 MST" )
39+ ts2 := ts .Add (- 1 * time .Second ).Format ("2006-01-02 15:04:05 MST" )
40+
7041 tests := []struct {
7142 name string
7243 log string
@@ -77,39 +48,39 @@ func TestLogsCollector_ParseRDSFormat(t *testing.T) {
7748 }{
7849 {
7950 name : "ERROR severity" ,
80- log : testLogLine ( `2025-12-12 15:29:16.068 GMT :[local]:app-user@books_store:[9112]:4:57014:2025-12-12 15:29:15 GMT :25/112:0:693c34cb.2398::psqlERROR: canceling statement` ) ,
51+ log : ts1 + " :[local]:app-user@books_store:[9112]:4:57014:" + ts2 + " :25/112:0:693c34cb.2398::psqlERROR: canceling statement" ,
8152 wantUser : "app-user" ,
8253 wantDB : "books_store" ,
8354 wantSev : "ERROR" ,
8455 wantSQLState : "57014" ,
8556 },
8657 {
8758 name : "FATAL severity" ,
88- log : testLogLine ( `2025-12-12 15:29:31.529 GMT :[local]:conn_user@testdb:[9449]:4:53300:2025-12-12 15:29:31 GMT :91/57:0:693c34db.24e9::psqlFATAL: too many connections` ) ,
59+ log : ts1 + " :[local]:conn_user@testdb:[9449]:4:53300:" + ts2 + " :91/57:0:693c34db.24e9::psqlFATAL: too many connections" ,
8960 wantUser : "conn_user" ,
9061 wantDB : "testdb" ,
9162 wantSev : "FATAL" ,
9263 wantSQLState : "53300" ,
9364 },
9465 {
9566 name : "PANIC severity" ,
96- log : testLogLine ( `2025-12-12 15:30:00.000 GMT :10.0.1.10(5432):admin@postgres:[9500]:1:XX000:2025-12-12 15:30:00 GMT :1/1:0:693c34db.9999::psqlPANIC: system failure` ) ,
67+ log : ts1 + " :10.0.1.10(5432):admin@postgres:[9500]:1:XX000:" + ts2 + " :1/1:0:693c34db.9999::psqlPANIC: system failure" ,
9768 wantUser : "admin" ,
9869 wantDB : "postgres" ,
9970 wantSev : "PANIC" ,
10071 wantSQLState : "XX000" ,
10172 },
10273 {
10374 name : "UTC timezone" ,
104- log : testLogLine ( `2025-12-12 15:29:16.068 UTC :10.0.1.5(12345):app-user@books_store:[9112]:4:40001:2025-12-12 15:29:15 UTC :25/112:0:693c34cb.2398::psqlERROR: could not serialize access` ) ,
75+ log : ts1 + " :10.0.1.5(12345):app-user@books_store:[9112]:4:40001:" + ts2 + " :25/112:0:693c34cb.2398::psqlERROR: could not serialize access" ,
10576 wantUser : "app-user" ,
10677 wantDB : "books_store" ,
10778 wantSev : "ERROR" ,
10879 wantSQLState : "40001" ,
10980 },
11081 {
11182 name : "EST timezone" ,
112- log : testLogLine ( `2025-12-12 15:29:16.068 EST:10.0.1.5(12345):app-user@books_store:[9112]:4:40001:2025-12-12 15:29:15 EST:25/112:0:693c34cb.2398::psqlERROR: could not serialize access` ) ,
83+ log : strings . ReplaceAll ( ts1 , " UTC" , " EST" ) + " :10.0.1.5(12345):app-user@books_store:[9112]:4:40001:" + strings . ReplaceAll ( ts2 , " UTC" , " EST" ) + " :25/112:0:693c34cb.2398::psqlERROR: could not serialize access" ,
11384 wantUser : "app-user" ,
11485 wantDB : "books_store" ,
11586 wantSev : "ERROR" ,
@@ -137,7 +108,7 @@ func TestLogsCollector_ParseRDSFormat(t *testing.T) {
137108 for _ , label := range metric .GetLabel () {
138109 labels [label .GetName ()] = label .GetValue ()
139110 }
140- if labels ["user" ] == tt .wantUser && labels ["datname" ] == tt .wantDB {
111+ if labels ["user" ] == tt .wantUser && labels ["datname" ] == tt .wantDB && labels [ "severity" ] == tt . wantSev && labels [ "sqlstate" ] == tt . wantSQLState {
141112 require .Equal (t , tt .wantSev , labels ["severity" ])
142113 require .Equal (t , tt .wantSQLState , labels ["sqlstate" ])
143114 require .Equal (t , tt .wantSQLState [:2 ], labels ["sqlstate_class" ])
@@ -164,14 +135,20 @@ func TestLogsCollector_SkipsNonErrors(t *testing.T) {
164135 })
165136 require .NoError (t , err )
166137
138+ startTime := collector .startTime
167139 err = collector .Start (context .Background ())
168140 require .NoError (t , err )
169141 defer collector .Stop ()
170142
171- // Send INFO and LOG messages (should be skipped)
143+ // Build INFO and LOG lines with timestamps AFTER collector start, so they would pass the
144+ // historical filter if they reached it. They are skipped for severity (not ERROR/FATAL/PANIC).
145+ ts := startTime .Add (10 * time .Second ).UTC ()
146+ ts1 := ts .Format ("2006-01-02 15:04:05.000 MST" )
147+ ts2 := ts .Add (- 1 * time .Second ).Format ("2006-01-02 15:04:05 MST" )
148+
172149 skipLogs := []string {
173- testLogLine ( `2025-12-12 15:29:42.201 GMT :::1:app-user@books_store:[9589]:2:00000:2025-12-12 15:29:42 GMT :159/363:0:693c34e6.2575::psqlINFO: some info` ) ,
174- testLogLine ( `2025-12-12 15:29:42.201 GMT :::1:app-user@books_store:[9589]:2::2025-12-12 15:29:42 GMT :159/363:0:693c34e6.2575::psqlLOG: connection received` ) ,
150+ ts1 + " :::1:app-user@books_store:[9589]:2:00000:" + ts2 + " :159/363:0:693c34e6.2575::psqlINFO: some info" ,
151+ ts1 + " :::1:app-user@books_store:[9589]:2:00000:" + ts2 + " :159/363:0:693c34e6.2575::psqlLOG: connection received" ,
175152 "DETAIL: Some detail line" ,
176153 "HINT: Some hint line" ,
177154 "\t Indented continuation line" ,
@@ -209,10 +186,15 @@ func TestLogsCollector_MetricSumming(t *testing.T) {
209186 })
210187 require .NoError (t , err )
211188
189+ startTime := collector .startTime
212190 err = collector .Start (context .Background ())
213191 require .NoError (t , err )
214192 defer collector .Stop ()
215193
194+ ts := startTime .Add (10 * time .Second ).UTC ()
195+ ts1 := ts .Format ("2006-01-02 15:04:05.000 MST" )
196+ ts2 := ts .Add (- 1 * time .Second ).Format ("2006-01-02 15:04:05 MST" )
197+
216198 // Send multiple errors with same labels (should sum)
217199 logs := []struct {
218200 log string
@@ -221,25 +203,25 @@ func TestLogsCollector_MetricSumming(t *testing.T) {
221203 sev string
222204 }{
223205 {
224- log : testLogLine ( `2025-01-12 10:30:45 UTC :10.0.1.5:54321:user1@db1:[9112]:4:57014:2025-01-12 10:29:15 UTC :25/112:0:693c34cb.2398::psqlERROR: error 1` ) ,
206+ log : ts1 + " :10.0.1.5:54321:user1@db1:[9112]:4:57014:" + ts2 + " :25/112:0:693c34cb.2398::psqlERROR: error 1" ,
225207 user : "user1" ,
226208 db : "db1" ,
227209 sev : "ERROR" ,
228210 },
229211 {
230- log : testLogLine ( `2025-01-12 10:31:00 UTC :10.0.1.5:54321:user1@db1:[9113]:5:57014:2025-01-12 10:29:15 UTC :25/113:0:693c34cb.2399::psqlERROR: error 2` ) ,
212+ log : ts1 + " :10.0.1.5:54321:user1@db1:[9113]:5:57014:" + ts2 + " :25/113:0:693c34cb.2399::psqlERROR: error 2" ,
231213 user : "user1" ,
232214 db : "db1" ,
233215 sev : "ERROR" ,
234216 },
235217 {
236- log : testLogLine ( `2025-01-12 10:32:00 UTC :10.0.1.5:54321:user1@db1:[9114]:6:57014:2025-01-12 10:29:15 UTC :25/114:0:693c34cb.2400::psqlERROR: error 3` ) ,
218+ log : ts1 + " :10.0.1.5:54321:user1@db1:[9114]:6:57014:" + ts2 + " :25/114:0:693c34cb.2400::psqlERROR: error 3" ,
237219 user : "user1" ,
238220 db : "db1" ,
239221 sev : "ERROR" ,
240222 },
241223 {
242- log : testLogLine ( `2025-01-12 10:33:00 UTC :10.0.1.5:54322:user2@db2:[9115]:7:28P01:2025-01-12 10:33:00 UTC :159/363:0:693c34e6.2575::psqlFATAL: auth failed` ) ,
224+ log : ts1 + " :10.0.1.5:54322:user2@db2:[9115]:7:28P01:" + ts2 + " :159/363:0:693c34e6.2575::psqlFATAL: auth failed" ,
243225 user : "user2" ,
244226 db : "db2" ,
245227 sev : "FATAL" ,
@@ -344,19 +326,25 @@ func TestLogsCollector_EmptyUserAndDatabase(t *testing.T) {
344326 })
345327 require .NoError (t , err )
346328
329+ startTime := collector .startTime
347330 err = collector .Start (context .Background ())
348331 require .NoError (t , err )
349332 defer collector .Stop ()
350333
351- // Send log with empty user and database (background worker termination)
334+ // Build log with timestamps after collector start (empty user/database = background worker)
335+ ts := startTime .Add (10 * time .Second ).UTC ()
336+ ts1 := ts .Format ("2006-01-02 15:04:05.000 MST" )
337+ ts2 := ts .Add (- 1 * time .Second ).Format ("2006-01-02 15:04:05 MST" )
338+ logLine := fmt .Sprintf ("%s::@:[26350]:1:57P01:%s:828/162213:0:6982f7c4.66ee:FATAL: terminating background worker \" parallel worker\" due to administrator command" , ts1 , ts2 )
339+
352340 collector .Receiver ().Chan () <- loki.Entry {
353341 Entry : push.Entry {
354- Line : testLogLine ( `2026-02-04 07:39:49.124 UTC::@:[26350]:1:57P01:2026-02-04 07:39:48 UTC:828/162213:0:6982f7c4.66ee:FATAL: terminating background worker "parallel worker" due to administrator command` ) ,
342+ Line : logLine ,
355343 Timestamp : time .Now (),
356344 },
357345 }
358346
359- time .Sleep (100 * time .Millisecond )
347+ time .Sleep (200 * time .Millisecond )
360348
361349 // Verify metric was created with empty user and database labels
362350 mfs , _ := registry .Gather ()
@@ -466,10 +454,15 @@ func TestLogsCollector_SQLStateExtraction(t *testing.T) {
466454 })
467455 require .NoError (t , err )
468456
457+ startTime := collector .startTime
469458 err = collector .Start (context .Background ())
470459 require .NoError (t , err )
471460 defer collector .Stop ()
472461
462+ ts := startTime .Add (10 * time .Second ).UTC ()
463+ ts1 := ts .Format ("2006-01-02 15:04:05.000 MST" )
464+ ts2 := ts .Add (- 1 * time .Second ).Format ("2006-01-02 15:04:05 MST" )
465+
473466 tests := []struct {
474467 name string
475468 log string
@@ -479,49 +472,49 @@ func TestLogsCollector_SQLStateExtraction(t *testing.T) {
479472 }{
480473 {
481474 name : "Serialization failure (40001)" ,
482- log : `2026-01-25 20:00:00.702 UTC :10.24.193.106(33090):mybooks-app@books_store:[25599]:1:40001:2026-01-25 19:58:36 UTC :172/48089:85097235:697675ec.63ff:[unknown]:ERROR: could not serialize access due to concurrent update` ,
475+ log : ts1 + " :10.24.193.106(33090):mybooks-app@books_store:[25599]:1:40001:" + ts2 + " :172/48089:85097235:697675ec.63ff:[unknown]:ERROR: could not serialize access due to concurrent update" ,
483476 wantSQLState : "40001" ,
484477 wantSQLStateClass : "40" ,
485478 wantSeverity : "ERROR" ,
486479 },
487480 {
488481 name : "Deadlock detected (40P01)" ,
489- log : `2026-01-25 20:01:30 UTC :10.32.115.73(34710):mybooks-app-2@books_store_2:[2170]:1:40P01:2026-01-25 20:00:00 UTC :100/200:85097240:69767600.1000:[unknown]:ERROR: deadlock detected` ,
482+ log : ts1 + " :10.32.115.73(34710):mybooks-app-2@books_store_2:[2170]:1:40P01:" + ts2 + " :100/200:85097240:69767600.1000:[unknown]:ERROR: deadlock detected" ,
490483 wantSQLState : "40P01" ,
491484 wantSQLStateClass : "40" ,
492485 wantSeverity : "ERROR" ,
493486 },
494487 {
495488 name : "Unique violation (23505)" ,
496- log : `2026-01-25 20:02:00 UTC :10.24.193.106(44148):app-user@testdb:[25296]:2:23505:2026-01-25 20:00:00 UTC :121/51119:85097236:6976755e.62d0:[unknown]:ERROR: duplicate key value violates unique constraint` ,
489+ log : ts1 + " :10.24.193.106(44148):app-user@testdb:[25296]:2:23505:" + ts2 + " :121/51119:85097236:6976755e.62d0:[unknown]:ERROR: duplicate key value violates unique constraint" ,
497490 wantSQLState : "23505" ,
498491 wantSQLStateClass : "23" ,
499492 wantSeverity : "ERROR" ,
500493 },
501494 {
502495 name : "Query canceled (57014)" ,
503- log : testLogLine ( `2025-12-12 15:29:16.068 GMT :[local]:app-user@books_store:[9112]:4:57014:2025-12-12 15:29:15 GMT :25/112:0:693c34cb.2398::psqlERROR: canceling statement` ) ,
496+ log : ts1 + " :[local]:app-user@books_store:[9112]:4:57014:" + ts2 + " :25/112:0:693c34cb.2398::psqlERROR: canceling statement" ,
504497 wantSQLState : "57014" ,
505498 wantSQLStateClass : "57" ,
506499 wantSeverity : "ERROR" ,
507500 },
508501 {
509502 name : "Too many connections (53300)" ,
510- log : testLogLine ( `2025-12-12 15:29:31.529 GMT :[local]:conn_user@testdb:[9449]:4:53300:2025-12-12 15:29:31 GMT :91/57:0:693c34db.24e9::psqlFATAL: too many connections` ) ,
503+ log : ts1 + " :[local]:conn_user@testdb:[9449]:4:53300:" + ts2 + " :91/57:0:693c34db.24e9::psqlFATAL: too many connections" ,
511504 wantSQLState : "53300" ,
512505 wantSQLStateClass : "53" ,
513506 wantSeverity : "FATAL" ,
514507 },
515508 {
516509 name : "Auth failed (28P01)" ,
517- log : testLogLine ( `2025-12-12 10:33:00 UTC :10.0.1.5:54322:user2@db2:[9115]:7:28P01:2025-12-12 10:33:00 UTC :159/363:0:693c34e6.2575::psqlFATAL: password authentication failed` ) ,
510+ log : ts1 + " :10.0.1.5:54322:user2@db2:[9115]:7:28P01:" + ts2 + " :159/363:0:693c34e6.2575::psqlFATAL: password authentication failed" ,
518511 wantSQLState : "28P01" ,
519512 wantSQLStateClass : "28" ,
520513 wantSeverity : "FATAL" ,
521514 },
522515 {
523516 name : "Internal error (XX000)" ,
524- log : testLogLine ( `2025-12-12 15:30:00.000 GMT :10.0.1.10(5432):admin@postgres:[9500]:1:XX000:2025-12-12 15:30:00 GMT :1/1:0:693c34db.9999::psqlPANIC: unexpected internal error` ) ,
517+ log : ts1 + " :10.0.1.10(5432):admin@postgres:[9500]:1:XX000:" + ts2 + " :1/1:0:693c34db.9999::psqlPANIC: unexpected internal error" ,
525518 wantSQLState : "XX000" ,
526519 wantSQLStateClass : "XX" ,
527520 wantSeverity : "PANIC" ,
@@ -582,14 +575,14 @@ func TestLogsCollector_SkipsHistoricalLogs(t *testing.T) {
582575
583576 // Create timestamps relative to start time
584577 historicalTime := startTime .Add (- 1 * time .Hour )
585- recentTime := startTime .Add (1 * time .Second )
578+ recentTime := startTime .Add (10 * time .Second )
586579
587580 // Send historical log (1 hour before start) with timestamp in log line
588- historicalLine := fmt .Sprintf ("%s:[local]:user@database:[1234]:1:28000:%s:1/1:0:000000.0::psqlERROR: test historical error" ,
589- historicalTime .Format ("2006-01-02 15:04:05.000 MST" ),
581+ historicalLine := fmt .Sprintf ("%s:[local]:user@database:[1234]:1:28000:%s:1/1:0:000000.0::psqlERROR: test historical error" ,
582+ historicalTime .Format ("2006-01-02 15:04:05.000 MST" ),
590583 historicalTime .Format ("2006-01-02 15:04:05 MST" ))
591584 t .Logf ("Historical line: %s" , historicalLine )
592-
585+
593586 historicalEntry := loki.Entry {
594587 Entry : push.Entry {
595588 Timestamp : time .Now (),
@@ -598,11 +591,11 @@ func TestLogsCollector_SkipsHistoricalLogs(t *testing.T) {
598591 }
599592
600593 // Send recent log (after start) with timestamp in log line
601- recentLine := fmt .Sprintf ("%s:[local]:user@database:[1234]:1:28000:%s:1/1:0:000000.0::psqlERROR: test recent error" ,
602- recentTime .Format ("2006-01-02 15:04:05.000 MST" ),
594+ recentLine := fmt .Sprintf ("%s:[local]:user@database:[1234]:1:28000:%s:1/1:0:000000.0::psqlERROR: test recent error" ,
595+ recentTime .Format ("2006-01-02 15:04:05.000 MST" ),
603596 recentTime .Format ("2006-01-02 15:04:05 MST" ))
604597 t .Logf ("Recent line: %s" , recentLine )
605-
598+
606599 recentEntry := loki.Entry {
607600 Entry : push.Entry {
608601 Timestamp : time .Now (),
@@ -631,3 +624,50 @@ func TestLogsCollector_SkipsHistoricalLogs(t *testing.T) {
631624 t .Logf ("Total count: %f" , totalCount )
632625 require .Equal (t , float64 (1 ), totalCount , "only recent log should be counted" )
633626}
627+
628+ func TestLogsCollector_SkipsOnlyHistoricalLogs (t * testing.T ) {
629+ // Explicitly validates that logs with timestamps before collector start produce 0 metrics
630+ entryHandler := loki .NewEntryHandler (make (chan loki.Entry , 10 ), func () {})
631+ registry := prometheus .NewRegistry ()
632+
633+ collector , err := NewLogs (LogsArguments {
634+ Receiver : loki .NewLogsReceiver (),
635+ EntryHandler : entryHandler ,
636+ Logger : log .NewNopLogger (),
637+ Registry : registry ,
638+ })
639+ require .NoError (t , err )
640+
641+ startTime := collector .startTime
642+ err = collector .Start (context .Background ())
643+ require .NoError (t , err )
644+ defer collector .Stop ()
645+
646+ // Send ONLY historical logs (valid ERROR format, but timestamp before collector start)
647+ historicalTime := startTime .Add (- 1 * time .Hour )
648+ historicalLine := fmt .Sprintf ("%s:[local]:user@database:[1234]:1:28000:%s:1/1:0:000000.0::psqlERROR: test historical error" ,
649+ historicalTime .Format ("2006-01-02 15:04:05.000 MST" ),
650+ historicalTime .Format ("2006-01-02 15:04:05 MST" ))
651+
652+ collector .Receiver ().Chan () <- loki.Entry {
653+ Entry : push.Entry {
654+ Line : historicalLine ,
655+ Timestamp : time .Now (),
656+ },
657+ }
658+ time .Sleep (200 * time .Millisecond )
659+
660+ // Verify 0 metrics - historical logs must be skipped
661+ mfs , err := registry .Gather ()
662+ require .NoError (t , err )
663+
664+ var totalCount float64
665+ for _ , mf := range mfs {
666+ if mf .GetName () == "database_observability_postgres_errors_total" {
667+ for _ , metric := range mf .GetMetric () {
668+ totalCount += metric .GetCounter ().GetValue ()
669+ }
670+ }
671+ }
672+ require .Equal (t , float64 (0 ), totalCount , "historical logs must not produce metrics" )
673+ }
0 commit comments