Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,27 @@ import zio.test.TestAspect.withLiveClock
import zio.{ZIO, durationInt}
import zio.raft.LogEntry.NoopLogEntry
import zio.raft.LogEntry.CommandLogEntry
import zio.LogLevel
import zio.ZLogger
import zio.Cause
import zio.FiberId
import zio.FiberRefs
import zio.LogSpan
import zio.Trace
import java.util.concurrent.ConcurrentLinkedQueue
import scala.jdk.CollectionConverters._

object RaftIntegrationSpec extends ZIOSpecDefault:

// We use TestLogger instead of ZTestLogger because ZTestLogger can cause duplicated log lines whcih causes flakiness in our tests.
class TestLogger extends ZLogger[String, Unit] {
var messages: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue()
override def apply(trace: Trace, fiberId: FiberId, logLevel: LogLevel, message: () => String, cause: Cause[Any], context: FiberRefs, spans: List[LogSpan], annotations: Map[String, String]): Unit =
messages.add(message())
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply is declared to return Unit, but the body evaluates to Boolean (ConcurrentLinkedQueue.add returns Boolean), causing a type mismatch in Scala 3. Discard the Boolean result explicitly, e.g.: override def apply(...): Unit = { messages.add(message()); () }.

Suggested change
messages.add(message())
{ messages.add(message()); () }

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect, check again


def getMessages: List[String] = messages.asScala.toList
}

private def findTheNewLeader(
currentLeader: Raft[Int, TestCommands],
raft1: Raft[Int, TestCommands],
Expand Down Expand Up @@ -182,48 +200,49 @@ object RaftIntegrationSpec extends ZIOSpecDefault:
},
test("read returns the correct state with multiple writes") {
for
testLogger <- ZIO.succeed(new TestLogger())
(
r1,
killSwitch1,
r2,
killSwitch2,
r3,
killSwitch3
) <- makeRaft().provideSomeLayer(zio.Runtime.removeDefaultLoggers >>> zio.test.ZTestLogger.default)
) <- makeRaft().provideSomeLayer(zio.Runtime.removeDefaultLoggers >>> zio.Runtime.addLogger(testLogger))

// Making sure we call readState while there are queued write commands is difficult,
// we use this approach to make sure there are some unhandled commands before we call readState, hopefully it won't be too flaky
_ <- r1.sendCommand(Increase).fork.repeatN(99)

readResult1 <- r1.readState

Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counting log messages directly from a ConcurrentLinkedQueue immediately after readState can race with late asynchronous log emissions, potentially reintroducing flakiness (only the first test is marked flaky). Consider synchronizing by awaiting completion of pending fibers or introducing a short drain/settle step (e.g., ZIO.sleep(...)) before snapshotting messages, or capturing log entries with a timestamp and filtering only those emitted before readState completes.

Suggested change
// Allow time for any late log emissions to settle before snapshotting messages
_ <- ZIO.sleep(100.millis)

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very nice point, since I tested with nonFlaky I think we're good for now

output <- ZTestLogger.logOutput
_ = output.foreach(s => println(s.message()))
pendingHeartbeatLogCount = output.count(_.message().contains("memberId=MemberId(peer1) read pending heartbeat"))
pendingCommandLogCount = output.count(_.message().contains("memberId=MemberId(peer1) read pending command"))
messages = testLogger.getMessages
pendingHeartbeatLogCount = messages.count(_.contains("memberId=MemberId(peer1) read pending heartbeat"))
pendingCommandLogCount = messages.count(_.contains("memberId=MemberId(peer1) read pending command"))
yield assertTrue(readResult1 > 0) && assertTrue(pendingHeartbeatLogCount == 0) && assertTrue(pendingCommandLogCount == 1)
},
} @@ TestAspect.flaky, // TODO (eran): because of the way this test is structured it is currently flaky, we'll need to find another way to send commands so the readState will have pending commands

test("read returns the correct state when there are no pending writes.") {
for
testLogger <- ZIO.succeed(new TestLogger())
(
r1,
killSwitch1,
r2,
killSwitch2,
r3,
killSwitch3
) <- makeRaft().provideSomeLayer(zio.Runtime.removeDefaultLoggers >>> zio.test.ZTestLogger.default)
) <- makeRaft().provideSomeLayer(zio.Runtime.removeDefaultLoggers >>> zio.Runtime.addLogger(testLogger))

_ <- r1.sendCommand(Increase)

// When this runs we should have no writes in the queue since the sendCommand call is blocking
readResult <- r1.readState

// verify read waits for heartbeat and not a write/noop command
output <- ZTestLogger.logOutput
pendingHeartbeatLogCount = output.count(_.message().contains("memberId=MemberId(peer1) read pending heartbeat"))
pendingCommandLogCount = output.count(_.message().contains("memberId=MemberId(peer1) read pending command"))
messages = testLogger.getMessages
pendingHeartbeatLogCount = messages.count(_.contains("memberId=MemberId(peer1) read pending heartbeat"))
pendingCommandLogCount = messages.count(_.contains("memberId=MemberId(peer1) read pending command"))
Comment on lines +243 to +245
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counting log messages directly from a ConcurrentLinkedQueue immediately after readState can race with late asynchronous log emissions, potentially reintroducing flakiness (only the first test is marked flaky). Consider synchronizing by awaiting completion of pending fibers or introducing a short drain/settle step (e.g., ZIO.sleep(...)) before snapshotting messages, or capturing log entries with a timestamp and filtering only those emitted before readState completes.

Copilot uses AI. Check for mistakes.
yield assertTrue(readResult == 1) && assertTrue(pendingHeartbeatLogCount == 1) && assertTrue(pendingCommandLogCount == 0)
},

Expand Down