@@ -5,9 +5,27 @@ import zio.test.TestAspect.withLiveClock
55import zio .{ZIO , durationInt }
66import zio .raft .LogEntry .NoopLogEntry
77import zio .raft .LogEntry .CommandLogEntry
8+ import zio .LogLevel
9+ import zio .ZLogger
10+ import zio .Cause
11+ import zio .FiberId
12+ import zio .FiberRefs
13+ import zio .LogSpan
14+ import zio .Trace
15+ import java .util .concurrent .ConcurrentLinkedQueue
16+ import scala .jdk .CollectionConverters ._
817
918object RaftIntegrationSpec extends ZIOSpecDefault :
1019
20+ // We use TestLogger instead of ZTestLogger because ZTestLogger can cause duplicated log lines whcih causes flakiness in our tests.
21+ class TestLogger extends ZLogger [String , Unit ] {
22+ var messages : ConcurrentLinkedQueue [String ] = new ConcurrentLinkedQueue ()
23+ override def apply (trace : Trace , fiberId : FiberId , logLevel : LogLevel , message : () => String , cause : Cause [Any ], context : FiberRefs , spans : List [LogSpan ], annotations : Map [String , String ]): Unit =
24+ messages.add(message())
25+
26+ def getMessages : List [String ] = messages.asScala.toList
27+ }
28+
1129 private def findTheNewLeader (
1230 currentLeader : Raft [Int , TestCommands ],
1331 raft1 : Raft [Int , TestCommands ],
@@ -182,48 +200,49 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
182200 },
183201 test(" read returns the correct state with multiple writes" ) {
184202 for
203+ testLogger <- ZIO .succeed(new TestLogger ())
185204 (
186205 r1,
187206 killSwitch1,
188207 r2,
189208 killSwitch2,
190209 r3,
191210 killSwitch3
192- ) <- makeRaft().provideSomeLayer(zio.Runtime .removeDefaultLoggers >>> zio.test. ZTestLogger .default )
211+ ) <- makeRaft().provideSomeLayer(zio.Runtime .removeDefaultLoggers >>> zio.Runtime .addLogger(testLogger) )
193212
194213 // Making sure we call readState while there are queued write commands is difficult,
195214 // we use this approach to make sure there are some unhandled commands before we call readState, hopefully it won't be too flaky
196215 _ <- r1.sendCommand(Increase ).fork.repeatN(99 )
197-
216+
198217 readResult1 <- r1.readState
199218
200- output <- ZTestLogger .logOutput
201- _ = output.foreach(s => println(s.message()))
202- pendingHeartbeatLogCount = output.count(_.message().contains(" memberId=MemberId(peer1) read pending heartbeat" ))
203- pendingCommandLogCount = output.count(_.message().contains(" memberId=MemberId(peer1) read pending command" ))
219+ messages = testLogger.getMessages
220+ pendingHeartbeatLogCount = messages.count(_.contains(" memberId=MemberId(peer1) read pending heartbeat" ))
221+ pendingCommandLogCount = messages.count(_.contains(" memberId=MemberId(peer1) read pending command" ))
204222 yield assertTrue(readResult1 > 0 ) && assertTrue(pendingHeartbeatLogCount == 0 ) && assertTrue(pendingCommandLogCount == 1 )
205- },
223+ } @@ TestAspect .flaky, // TODO (eran): because of the way this test is structured it is currently flaky, we'll need to find another way to send commands so the readState will have pending commands
206224
207225 test(" read returns the correct state when there are no pending writes." ) {
208226 for
227+ testLogger <- ZIO .succeed(new TestLogger ())
209228 (
210229 r1,
211230 killSwitch1,
212231 r2,
213232 killSwitch2,
214233 r3,
215234 killSwitch3
216- ) <- makeRaft().provideSomeLayer(zio.Runtime .removeDefaultLoggers >>> zio.test. ZTestLogger .default )
235+ ) <- makeRaft().provideSomeLayer(zio.Runtime .removeDefaultLoggers >>> zio.Runtime .addLogger(testLogger) )
217236
218237 _ <- r1.sendCommand(Increase )
219238
220239 // When this runs we should have no writes in the queue since the sendCommand call is blocking
221240 readResult <- r1.readState
222241
223242 // verify read waits for heartbeat and not a write/noop command
224- output <- ZTestLogger .logOutput
225- pendingHeartbeatLogCount = output .count(_.message() .contains(" memberId=MemberId(peer1) read pending heartbeat" ))
226- pendingCommandLogCount = output .count(_.message() .contains(" memberId=MemberId(peer1) read pending command" ))
243+ messages = testLogger.getMessages
244+ pendingHeartbeatLogCount = messages .count(_.contains(" memberId=MemberId(peer1) read pending heartbeat" ))
245+ pendingCommandLogCount = messages .count(_.contains(" memberId=MemberId(peer1) read pending command" ))
227246 yield assertTrue(readResult == 1 ) && assertTrue(pendingHeartbeatLogCount == 1 ) && assertTrue(pendingCommandLogCount == 0 )
228247 },
229248
0 commit comments