Skip to content

Commit 6537139

Browse files
twmbclaude
authored andcommitted
Fix flaky snapshot+stream race in integration test
The test compared the stream output against an atomic counter that races with the actual database commits. The writer goroutine does INSERT then count.Add(1), but Stop() can land between those two operations, causing the count to be off by one from what the database (and thus the stream) actually sees. Fix: read the committed row count from the database after stopping the writer, instead of trusting the in-process atomic counter. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 96729e5 commit 6537139

3 files changed

Lines changed: 21 additions & 6 deletions

File tree

internal/impl/postgresql/integration_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -916,11 +916,9 @@ read_until:
916916
}))
917917

918918
// Continuously write so there is a chance we skip data between snapshot and stream hand off.
919-
var count atomic.Int64
920919
writer := asyncroutine.NewPeriodic(time.Microsecond, func() {
921920
_, err := db.Exec("INSERT INTO seq DEFAULT VALUES")
922921
require.NoError(t, err)
923-
count.Add(1)
924922
})
925923
writer.Start()
926924
t.Cleanup(writer.Stop)
@@ -948,8 +946,15 @@ read_until:
948946
require.Fail(t, "stream did not complete in time")
949947
}
950948
require.NoError(t, streamOut.StopWithin(10*time.Second))
949+
950+
// Read the actual committed count from the database rather than
951+
// relying on the atomic counter, which can race with the last
952+
// INSERT commit.
953+
var dbCount int64
954+
require.NoError(t, db.QueryRow("SELECT COUNT(*) FROM seq").Scan(&dbCount))
955+
951956
expected := []int64{}
952-
for i := range count.Load() {
957+
for i := range dbCount {
953958
expected = append(expected, i+1)
954959
}
955960
batchMu.Lock()

internal/impl/postgresql/pglogicalstream/pgtype_compat.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ package pglogicalstream
1010

1111
import "strings"
1212

13-
// sanitizeTsrange strips escaped quotes from Postgres tsrange text
14-
// representations. Postgres may return ranges like:
13+
// sanitizeTsrange strips quoting from Postgres tsrange text representations.
14+
//
15+
// Postgres quotes range bounds containing spaces, producing:
1516
//
1617
// ["2024-01-01 00:00:00","2024-12-31 00:00:00")
1718
//
@@ -20,7 +21,15 @@ import "strings"
2021
//
2122
// [2024-01-01 00:00:00,2024-12-31 00:00:00)
2223
//
23-
// This function replicates that behavior without the old pgtype dependency.
24+
// This function replicates that behavior by stripping all double quotes.
25+
// This is safe for tsrange because timestamp bound values never contain
26+
// literal double quotes — they consist only of digits, dashes, colons,
27+
// spaces, and decimal points.
28+
//
29+
// NOTE: This function is NOT suitable for arbitrary range types whose
30+
// bound values may contain literal double quotes (e.g. text ranges).
31+
// For such types, a proper range parser that handles quoting and escaping
32+
// (like the old pgtype.ParseUntypedTextRange) would be needed.
2433
func sanitizeTsrange(s string) string {
2534
return strings.ReplaceAll(s, `"`, "")
2635
}

internal/impl/postgresql/pglogicalstream/snapshotter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ type tuple struct {
253253
elements []any
254254
}
255255

256+
//nolint:stylecheck // This is implementing the squirrel.Sqlizer interface
256257
func (t *tuple) ToSql() (sql string, args []any, err error) {
257258
sql = "(" + strings.Join(slices.Repeat([]string{"?"}, len(t.elements)), ", ") + ")"
258259
args = t.elements

0 commit comments

Comments
 (0)