@@ -2059,6 +2059,17 @@ func runClusterReplicationDisconnect(
20592059
20602060 // TODO(msbutler): disconnect nodes during a random phase
20612061 require .NoError (t , waitForTargetPhase (ctx , rd , dstJobID , phaseSteadyState ))
2062+ waitForReplicatedTimeToReachTimestamp (t , int (dstJobID ), rd .setup .dst .db , getStreamIngestionJobInfo , 3 * time .Minute , timeutil .Now ())
2063+
2064+ // 50% of the time: pause the job, start partition, then resume
2065+ shouldPauseBeforePartition := rd .rng .Intn (2 ) == 0
2066+
2067+ if shouldPauseBeforePartition {
2068+ rd .t .L ().Printf ("Pausing stream ingestion job %d before network partition" , dstJobID )
2069+ rd .setup .dst .sysSQL .Exec (t , fmt .Sprintf ("PAUSE JOB %d" , dstJobID ))
2070+ require .NoError (t , WaitForPaused (ctx , rd .setup .dst .db , dstJobID , 2 * time .Minute ))
2071+ rd .t .L ().Printf ("Stream ingestion job %d is now paused" , dstJobID )
2072+ }
20622073
20632074 rd .t .L ().Printf ("Disconnecting all src nodes %v from all dst nodes %v" ,
20642075 rd .setup .src .nodes , rd .setup .dst .nodes )
@@ -2068,6 +2079,13 @@ func runClusterReplicationDisconnect(
20682079 blackholeFailer .FailPartial (ctx , srcNode , rd .setup .dst .nodes )
20692080 }
20702081
2082+ if shouldPauseBeforePartition {
2083+ rd .t .L ().Printf ("Resuming stream ingestion job %d after partition established" , dstJobID )
2084+ rd .setup .dst .sysSQL .Exec (t , fmt .Sprintf ("RESUME JOB %d" , dstJobID ))
2085+ require .NoError (t , WaitForResume (ctx , rd .setup .dst .db , dstJobID , 2 * time .Minute ))
2086+ rd .t .L ().Printf ("Stream ingestion job %d is now running" , dstJobID )
2087+ }
2088+
20712089 if ! persistPartition {
20722090 // Persist the partition for the duration of the workload. Once the network
20732091 // is restored, PCR will need to catch up to perform cutover AOST -2m.
0 commit comments