Skip to content

Commit 74f7146

Browse files
committed
Stabilize postgres multiplayer integration convergence
1 parent 8290328 commit 74f7146

File tree

1 file changed

+40
-16
lines changed

1 file changed

+40
-16
lines changed

internal/storage/postgres_integration_test.go

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ func newIntegrationEnv(t *testing.T, timeout time.Duration) *integrationEnv {
6767
return env
6868
}
6969

70-
func (e *integrationEnv) waitForLocalJobs(db *DB, expected int, timeout time.Duration) {
71-
e.T.Helper()
72-
waitCondition(e.T, timeout, fmt.Sprintf("local job count %d", expected), func() (bool, error) {
70+
func (e *integrationEnv) waitForLocalJobs(t *testing.T, db *DB, expected int, timeout time.Duration) {
71+
t.Helper()
72+
waitCondition(t, timeout, fmt.Sprintf("local job count %d", expected), func() (bool, error) {
7373
jobs, err := db.ListJobs("", "", 1000, 0)
7474
if err != nil {
7575
return false, err
@@ -546,8 +546,8 @@ func TestIntegration_Multiplayer(t *testing.T) {
546546
t.Fatalf("Machine B: Second SyncNow failed: %v", err)
547547
}
548548

549-
env.waitForLocalJobs(dbA, 2, 10*time.Second)
550-
env.waitForLocalJobs(dbB, 2, 10*time.Second)
549+
env.waitForLocalJobs(t, dbA, 2, 10*time.Second)
550+
env.waitForLocalJobs(t, dbB, 2, 10*time.Second)
551551

552552
env.assertPgCount("review_jobs", 2)
553553
env.assertPgCount("reviews", 2)
@@ -644,8 +644,8 @@ func TestIntegration_MultiplayerSameCommit(t *testing.T) {
644644
t.Fatalf("Machine B: Second SyncNow failed: %v", err)
645645
}
646646

647-
env.waitForLocalJobs(dbA, 2, 10*time.Second)
648-
env.waitForLocalJobs(dbB, 2, 10*time.Second)
647+
env.waitForLocalJobs(t, dbA, 2, 10*time.Second)
648+
env.waitForLocalJobs(t, dbB, 2, 10*time.Second)
649649

650650
env.assertPgCount("review_jobs", 2)
651651
env.assertPgCount("reviews", 2)
@@ -796,7 +796,7 @@ func TestIntegration_MultiplayerRealistic(t *testing.T) {
796796
syncAll(t)
797797
syncAll(t)
798798

799-
env.waitForLocalJobs(dbA, 30, 10*time.Second)
799+
env.waitForLocalJobs(t, dbA, 30, 10*time.Second)
800800
env.assertPgCount("review_jobs", 30)
801801
})
802802

@@ -820,7 +820,7 @@ func TestIntegration_MultiplayerRealistic(t *testing.T) {
820820
// All machines sync again
821821
syncAll(t)
822822

823-
env.waitForLocalJobs(dbA, 45, 10*time.Second)
823+
env.waitForLocalJobs(t, dbA, 45, 10*time.Second)
824824
env.assertPgCount("review_jobs", 45)
825825
})
826826

@@ -870,14 +870,38 @@ func TestIntegration_MultiplayerRealistic(t *testing.T) {
870870
t.Errorf("%v", err)
871871
}
872872

873-
// Final sync
874-
syncAll(t)
875-
876873
expectedTotal := 75
877874

878-
env.waitForLocalJobs(dbA, expectedTotal, 15*time.Second)
879-
env.waitForLocalJobs(dbB, expectedTotal, 15*time.Second)
880-
env.waitForLocalJobs(dbC, expectedTotal, 15*time.Second)
875+
// Workers run with a long interval in this test, so convergence
876+
// requires explicit repeated SyncNow calls until all nodes have
877+
// pulled the full set.
878+
waitCondition(t, 20*time.Second, fmt.Sprintf("all machines converge to %d jobs", expectedTotal), func() (bool, error) {
879+
if _, err := workerA.SyncNow(); err != nil {
880+
return false, fmt.Errorf("Machine A sync failed: %w", err)
881+
}
882+
if _, err := workerB.SyncNow(); err != nil {
883+
return false, fmt.Errorf("Machine B sync failed: %w", err)
884+
}
885+
if _, err := workerC.SyncNow(); err != nil {
886+
return false, fmt.Errorf("Machine C sync failed: %w", err)
887+
}
888+
889+
jobsA, err := dbA.ListJobs("", "", 1000, 0)
890+
if err != nil {
891+
return false, fmt.Errorf("Machine A list jobs failed: %w", err)
892+
}
893+
jobsB, err := dbB.ListJobs("", "", 1000, 0)
894+
if err != nil {
895+
return false, fmt.Errorf("Machine B list jobs failed: %w", err)
896+
}
897+
jobsC, err := dbC.ListJobs("", "", 1000, 0)
898+
if err != nil {
899+
return false, fmt.Errorf("Machine C list jobs failed: %w", err)
900+
}
901+
return len(jobsA) >= expectedTotal &&
902+
len(jobsB) >= expectedTotal &&
903+
len(jobsC) >= expectedTotal, nil
904+
})
881905

882906
env.assertPgCount("review_jobs", expectedTotal)
883907

@@ -1010,7 +1034,7 @@ func TestIntegration_MultiplayerOfflineReconnect(t *testing.T) {
10101034
t.Fatalf("Machine B: SyncNow failed: %v", err)
10111035
}
10121036

1013-
env.waitForLocalJobs(dbB, 3, 10*time.Second)
1037+
env.waitForLocalJobs(t, dbB, 3, 10*time.Second)
10141038

10151039
jobsB, err := dbB.ListJobs("", "", 100, 0)
10161040
if err != nil {

0 commit comments

Comments
 (0)