2626import java .util .concurrent .LinkedBlockingQueue ;
2727import java .util .concurrent .TimeUnit ;
2828import java .util .concurrent .TimeoutException ;
29-
29+ import java . util . concurrent . atomic . AtomicReference ;
3030import org .forgerock .i18n .slf4j .LocalizedLogger ;
3131import org .opends .server .TestCaseUtils ;
3232import org .opends .server .backends .task .Task ;
@@ -77,8 +77,8 @@ public void publishAndReceive(
7777 DN testService = DN .valueOf ("o=test" );
7878 ReplicationServer replServer1 = null ;
7979 ReplicationServer replServer2 = null ;
80- FakeReplicationDomain domain1 = null ;
81- FakeReplicationDomain domain2 = null ;
80+ AtomicReference < FakeReplicationDomain > domain1 = new AtomicReference <>() ;
81+ AtomicReference < FakeReplicationDomain > domain2 = new AtomicReference <>() ;
8282
8383 try
8484 {
@@ -93,85 +93,66 @@ public void publishAndReceive(
9393
9494 SortedSet <String > servers = newTreeSet ("localhost:" + replServerPort1 );
9595 BlockingQueue <UpdateMsg > rcvQueue1 = new LinkedBlockingQueue <>();
96- domain1 = new FakeReplicationDomain (
97- testService , domain1ServerId , servers , 100 , 1000 , rcvQueue1 );
96+ domain1 . set ( new FakeReplicationDomain (
97+ testService , domain1ServerId , servers , 100 , 1000 , rcvQueue1 )) ;
9898
9999 SortedSet <String > servers2 = newTreeSet ("localhost:" + replServerPort2 );
100100 BlockingQueue <UpdateMsg > rcvQueue2 = new LinkedBlockingQueue <>();
101- domain2 = new FakeReplicationDomain (
102- testService , domain2ServerId , servers2 , 100 , 1000 , rcvQueue2 );
103-
104- Thread .sleep (500 );
101+ domain2 .set (new FakeReplicationDomain (
102+ testService , domain2ServerId , servers2 , 100 , 1000 , rcvQueue2 ));
105103
106104 /*
107105 * Publish a message from domain1,
108106 * Check that domain2 receives it shortly after.
109107 */
110108 byte [] test = {1 , 2 , 3 ,4 , 0 , 1 , 2 , 3 , 4 , 5 };
111- publish (domain1 , test );
109+ publish (domain1 . get () , test );
112110
113111 UpdateMsg rcvdMsg = rcvQueue2 .poll (20 , TimeUnit .SECONDS );
114112 assertNotNull (rcvdMsg );
115113 assertEquals (test , rcvdMsg .getPayload ());
116114
117- for (RSInfo replServerInfo : domain1 .getRsInfos ())
118- {
119- // The generation Id of the remote should be 1
120- assertEquals (replServerInfo .getGenerationId (), 1 ,
121- "Unexpected value of generationId in RSInfo for RS=" + replServerInfo );
122- }
123-
124- for (DSInfo serverInfo : domain1 .getReplicaInfos ().values ())
125- {
126- assertEquals (serverInfo .getStatus (), ServerStatus .NORMAL_STATUS );
127- }
128-
129- domain1 .setGenerationID (2 );
130- domain1 .resetReplicationLog ();
131- Thread .sleep (500 );
115+ TestCaseUtils .repeatUntilSuccess (() -> {
116+ for (RSInfo replServerInfo : domain1 .get ().getRsInfos ())
117+ {
118+ // The generation Id of the remote should be 1
119+ assertEquals (replServerInfo .getGenerationId (), 1 ,
120+ "Unexpected value of generationId in RSInfo for RS=" + replServerInfo );
121+ }
122+ for (DSInfo serverInfo : domain1 .get ().getReplicaInfos ().values ())
123+ {
124+ assertEquals (serverInfo .getStatus (), ServerStatus .NORMAL_STATUS );
125+ }
126+ });
132127
133- for (RSInfo replServerInfo : domain1 .getRsInfos ())
134- {
135- // The generation Id of the remote should now be 2
136- assertEquals (replServerInfo .getGenerationId (), 2 ,
137- "Unexpected value of generationId in RSInfo for RS=" + replServerInfo );
138- }
128+ domain1 .get ().setGenerationID (2 );
129+ domain1 .get ().resetReplicationLog ();
139130
140- int sleepTime = 50 ;
141- while (true )
142- {
143- try
131+ TestCaseUtils .repeatUntilSuccess (() -> {
132+ for (RSInfo replServerInfo : domain1 .get ().getRsInfos ())
144133 {
145- assertExpectedServerStatuses ( domain1 . getReplicaInfos (),
146- domain1ServerId , domain2ServerId );
147- assertExpectedServerStatuses ( domain2 . getReplicaInfos (),
148- domain1ServerId , domain2ServerId );
134+ // The generation Id of the remote should now be 2
135+ assertEquals ( replServerInfo . getGenerationId (), 2 ,
136+ "Unexpected value of generationId in RSInfo for RS=" + replServerInfo );
137+ }
149138
150- Map <Integer , ServerState > states1 = domain1 .getReplicaStates ();
151- ServerState state2 = states1 .get (domain2ServerId );
152- assertNotNull (state2 , "getReplicaStates is not showing DS2" );
139+ assertExpectedServerStatuses (domain1 .get ().getReplicaInfos (),
140+ domain1ServerId , domain2ServerId );
141+ assertExpectedServerStatuses (domain2 .get ().getReplicaInfos (),
142+ domain1ServerId , domain2ServerId );
153143
154- Map <Integer , ServerState > states2 = domain2 .getReplicaStates ();
155- ServerState state1 = states2 .get (domain1ServerId );
156- assertNotNull (state1 , "getReplicaStates is not showing DS1 " );
144+ Map <Integer , ServerState > states1 = domain1 . get () .getReplicaStates ();
145+ ServerState state2 = states1 .get (domain2ServerId );
146+ assertNotNull (state2 , "getReplicaStates is not showing DS2 " );
157147
158- // if we reach this point all tests are OK
159- break ;
160- }
161- catch (AssertionError e )
162- {
163- if (sleepTime >= 30000 )
164- {
165- throw e ;
166- }
167- Thread .sleep (sleepTime );
168- sleepTime *= 2 ;
169- }
170- }
148+ Map <Integer , ServerState > states2 = domain2 .get ().getReplicaStates ();
149+ ServerState state1 = states2 .get (domain1ServerId );
150+ assertNotNull (state1 , "getReplicaStates is not showing DS1" );
151+ });
171152 }
172153 finally
173154 {
174- disable (domain1 , domain2 );
155+ disable (domain1 . get () , domain2 . get () );
175156 remove (replServer1 , replServer2 );
176157 }
177158 }
0 commit comments