Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions .github/workflows/scala-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: Scala CI

permissions:
contents: read

on:
pull_request:
branches: [ main ]
paths:
- '**.scala'
- 'build.sbt'
- 'project/**'
- '.github/workflows/scala-ci.yml'
- 'raft/**'
- 'raft-zmq/**'
- 'stores/**'
- 'kvstore/**'
- 'zio-lmdb/**'
- 'zio1-zmq/**'
- 'zio2-zmq/**'

concurrency:
group: scala-ci-${{ github.event.pull_request.number }}
cancel-in-progress: true

jobs:
scala-ci:
runs-on: ubuntu-latest
timeout-minutes: 10

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup Java
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '21'

- name: Setup SBT
uses: sbt/setup-sbt@v1

- name: Cache SBT dependencies
uses: actions/cache@v4
with:
path: |
~/.sbt
~/.ivy2/cache
~/.coursier
target
project/target
key: ${{ runner.os }}-sbt-${{ hashFiles('**/*.sbt', 'project/build.properties', 'project/plugins.sbt') }}
restore-keys: |
${{ runner.os }}-sbt-

- name: Check code formatting
run: sbt scalafmtCheck

- name: Compile
run: sbt Test/compile

- name: Run tests
run: sbt test

14 changes: 14 additions & 0 deletions .scalafmt-cross.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version = 3.8.1
runner.dialect = scala213source3
maxColumn = 120
indent.defnSite = 2
indent.caseSite = 2
newlines.source=keep

# Override for code that is used for cross compilation
fileOverride {
"glob:**/scala-3/**" {
runner.dialect = scala3
}
}

5 changes: 4 additions & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
version = "3.8.1"
version = 3.8.1
runner.dialect = scala3
maxColumn = 120
indent.defnSite = 2
indent.caseSite = 2
newlines.source=keep
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ lazy val zio1zmq = project
.settings(
name := "zio1-zmq",
crossScalaVersions := supportedScalaVersions,
scalafmtConfig := file(".scalafmt-cross.conf"),
scalacOptions ++= commonScalacOptions.value,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zio1Version,
Expand All @@ -160,6 +161,7 @@ lazy val zio2zmq = project
name := "zio2-zmq",
scalaVersion := mainScalaVersion,
crossScalaVersions := supportedScalaVersions,
scalafmtConfig := file(".scalafmt-cross.conf"),
scalacOptions ++= commonScalacOptions.value,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zio2Version,
Expand Down
4 changes: 2 additions & 2 deletions raft-zmq/src/main/scala/zio/raft/zmq/RemoveDuplicate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import zio.stream.ZPipeline
import zio.{Chunk, Clock, Ref, ZIO}

class RemoveDuplicate[A <: Command](
refPreviousMessage: Ref[Option[(Instant, RPCMessage[A])]]
refPreviousMessage: Ref[Option[(Instant, RPCMessage[A])]]
):

val minHeartbeatInterval = Raft.heartbeartInterval.dividedBy(2)
Expand All @@ -25,7 +25,7 @@ class RemoveDuplicate[A <: Command](
case Some(timestamp, previousMessage) =>
m match
case m: AppendEntriesRequest[A]
if previousMessage == m && now.isBefore(timestamp.plus(minHeartbeatInterval)) =>
if previousMessage == m && now.isBefore(timestamp.plus(minHeartbeatInterval)) =>
ZIO.succeed(false)
case _: AppendEntriesResult.Failure[A] if previousMessage == m =>
ZIO.succeed(false)
Expand Down
16 changes: 8 additions & 8 deletions raft-zmq/src/main/scala/zio/raft/zmq/ZmqRpc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import scodec.bits.BitVector
class ZmqRpc[A <: Command: Codec](server: ZSocket, clients: Map[MemberId, ZSocket]) extends RPC[A]:

override def sendAppendEntries(
peer: MemberId,
request: AppendEntriesRequest[A]
peer: MemberId,
request: AppendEntriesRequest[A]
): UIO[Boolean] =
val client = clients(peer)
val message = RpcMessageCodec.codec[A].encode(request).require.toByteArray
Expand All @@ -35,24 +35,24 @@ class ZmqRpc[A <: Command: Codec](server: ZSocket, clients: Map[MemberId, ZSocke
yield sent

override def sendRequestVoteResponse(
candidateId: MemberId,
response: RequestVoteResult[A]
candidateId: MemberId,
response: RequestVoteResult[A]
): UIO[Unit] =
val client = clients(candidateId)
val message = RpcMessageCodec.codec[A].encode(response).require.toByteArray
client.sendImmediately(message).ignore

override def sendRequestVote(
peer: MemberId,
m: RequestVoteRequest[A]
peer: MemberId,
m: RequestVoteRequest[A]
): UIO[Unit] =
val client = clients(peer)
val message = RpcMessageCodec.codec[A].encode(m).require.toByteArray
client.sendImmediately(message).ignore

override def sendAppendEntriesResponse(
leaderId: MemberId,
response: AppendEntriesResult[A]
leaderId: MemberId,
response: AppendEntriesResult[A]
): UIO[Unit] =
val client = clients(leaderId)
val message = RpcMessageCodec.codec[A].encode(response).require.toByteArray
Expand Down
2 changes: 1 addition & 1 deletion raft/src/main/scala/zio/raft/LogStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object LogStore:
yield new InMemoryLogStore(logs)

class InMemoryLogStore[A <: Command](
logs: Ref[List[LogEntry[A]]]
logs: Ref[List[LogEntry[A]]]
) extends LogStore[A]:

override def discardLogUpTo(index: Index): UIO[Unit] =
Expand Down
18 changes: 9 additions & 9 deletions raft/src/main/scala/zio/raft/PendingReads.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import zio.raft.PendingReadEntry.PendingCommand
import zio.raft.PendingReadEntry.PendingHeartbeat

case class PendingReads[S](
readsPendingCommands: InsertSortList[PendingReadEntry.PendingCommand[S]],
readsPendingHeartbeats: InsertSortList[PendingReadEntry.PendingHeartbeat[S]]
readsPendingCommands: InsertSortList[PendingReadEntry.PendingCommand[S]],
readsPendingHeartbeats: InsertSortList[PendingReadEntry.PendingHeartbeat[S]]
):
def withPendingCommand(promise: Promise[NotALeaderError, S], commandIndex: Index): PendingReads[S] =
this.copy(readsPendingCommands =
Expand All @@ -29,10 +29,10 @@ case class PendingReads[S](
ZIO.foreach(completed)(_.promise.succeed(stateAfterApply)).as(this.copy(readsPendingCommands = remaining))

def resolveReadsForHeartbeat(
memberId: MemberId,
timestamp: Instant,
state: S,
numberOfServers: Int
memberId: MemberId,
timestamp: Instant,
state: S,
numberOfServers: Int
): UIO[PendingReads[S]] =
if readsPendingHeartbeats.isEmpty then ZIO.succeed(this)
else
Expand All @@ -56,9 +56,9 @@ private enum PendingReadEntry[S](val promise: Promise[NotALeaderError, S]):
case PendingCommand(override val promise: Promise[NotALeaderError, S], enqueuedAtIndex: Index)
extends PendingReadEntry[S](promise)
case PendingHeartbeat(
override val promise: Promise[NotALeaderError, S],
timestamp: Instant,
peersHeartbeats: Peers = Set.empty
override val promise: Promise[NotALeaderError, S],
timestamp: Instant,
peersHeartbeats: Peers = Set.empty
) extends PendingReadEntry[S](promise)

object PendingReadEntry:
Expand Down
12 changes: 6 additions & 6 deletions raft/src/main/scala/zio/raft/RPC.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import zio.stream.Stream
trait RPC[A <: Command]:
def sendRequestVote(peer: MemberId, m: RequestVoteRequest[A]): UIO[Unit]
def sendRequestVoteResponse(
candidateId: MemberId,
response: RequestVoteResult[A]
candidateId: MemberId,
response: RequestVoteResult[A]
): UIO[Unit]
def sendHeartbeat(peer: MemberId, m: HeartbeatRequest[A]): UIO[Unit]
def sendHeartbeatResponse(leaderId: MemberId, m: HeartbeatResponse[A]): UIO[Unit]
def sendAppendEntriesResponse(
leaderId: MemberId,
response: AppendEntriesResult[A]
leaderId: MemberId,
response: AppendEntriesResult[A]
): UIO[Unit]
def sendAppendEntries(
peer: MemberId,
request: AppendEntriesRequest[A]
peer: MemberId,
request: AppendEntriesRequest[A]
): UIO[Boolean]

def sendInstallSnapshot(peer: MemberId, m: InstallSnapshotRequest[A]): UIO[Unit]
Expand Down
86 changes: 43 additions & 43 deletions raft/src/main/scala/zio/raft/Raft.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ object StreamItem:
val promise: CommandPromise[command.Response]

class Raft[S, A <: Command](
val memberId: MemberId,
peers: Peers,
private[raft] val raftState: Ref[State[S]],
commandsQueue: Queue[StreamItem[A, S]],
stable: Stable,
private[raft] val logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A],
appStateRef: Ref[S]
val memberId: MemberId,
peers: Peers,
private[raft] val raftState: Ref[State[S]],
commandsQueue: Queue[StreamItem[A, S]],
stable: Stable,
private[raft] val logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A],
appStateRef: Ref[S]
):
val rpcTimeout = 50.millis
val batchSize = 100
Expand Down Expand Up @@ -202,7 +202,7 @@ class Raft[S, A <: Command](
yield ()

private def handleAppendEntriesRequest(
m: AppendEntriesRequest[A]
m: AppendEntriesRequest[A]
): URIO[Any, AppendEntriesResult[A]] =
for
currentTerm <- stable.currentTerm
Expand Down Expand Up @@ -303,11 +303,11 @@ class Raft[S, A <: Command](
.withResume(from)
)
case AppendEntriesResult.Failure(
from,
_,
_,
Some(hintTerm, hintIndex)
) =>
from,
_,
_,
Some(hintTerm, hintIndex)
) =>
for
index <-
if hintTerm.isZero then ZIO.succeed(hintIndex)
Expand Down Expand Up @@ -652,17 +652,17 @@ class Raft[S, A <: Command](
leaderLastLogIndex <- logStore.lastIndex
_ <- s match
case l: Leader[S]
if leaderLastLogIndex >= l.nextIndex.get(
peer
) && !l.replicationStatus.isPaused(peer) =>
if leaderLastLogIndex >= l.nextIndex.get(
peer
) && !l.replicationStatus.isPaused(peer) =>
sendAppendEntries(peer, l, leaderLastLogIndex)
case _ => ZIO.unit
yield ()

private def sendAppendEntries(
peer: MemberId,
l: Leader[S],
leaderLastLogIndex: Index
peer: MemberId,
l: Leader[S],
leaderLastLogIndex: Index
) =
val nextIndex = l.nextIndex.get(peer)
val previousIndex = nextIndex.minusOne
Expand Down Expand Up @@ -784,10 +784,10 @@ class Raft[S, A <: Command](
now <- zio.Clock.instant
_ <- s match
case c: Candidate[S]
if c.rpcDue.due(
now,
peer
) =>
if c.rpcDue.due(
now,
peer
) =>
for
currentTerm <- stable.currentTerm
lastTerm <- logStore.lastTerm
Expand Down Expand Up @@ -861,8 +861,8 @@ class Raft[S, A <: Command](
handleInstallSnapshotReply(r)

private def handleRequestFromClient(
command: A,
promise: CommandPromise[command.Response]
command: A,
promise: CommandPromise[command.Response]
): ZIO[Any, Nothing, Unit] =
raftState.get.flatMap:
case l: Leader[S] =>
Expand Down Expand Up @@ -924,7 +924,7 @@ class Raft[S, A <: Command](
yield if s.isInstanceOf[Leader[S]] then true else false

def sendCommand(
commandArg: A
commandArg: A
): ZIO[Any, NotALeaderError, commandArg.Response] =
// todo: leader only
for
Expand Down Expand Up @@ -1010,13 +1010,13 @@ object Raft:
val heartbeartInterval = (electionTimeout / 2).millis

def make[S, A <: Command](
memberId: MemberId,
peers: Peers,
stable: Stable,
logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A]
memberId: MemberId,
peers: Peers,
stable: Stable,
logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A]
) =
for
now <- zio.Clock.instant
Expand Down Expand Up @@ -1061,13 +1061,13 @@ object Raft:
yield raft

def makeScoped[S, A <: Command](
memberId: MemberId,
peers: Peers,
stable: Stable,
logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A]
memberId: MemberId,
peers: Peers,
stable: Stable,
logStore: LogStore[A],
snapshotStore: SnapshotStore,
rpc: RPC[A],
stateMachine: StateMachine[S, A]
) =
for
raft <- make(
Expand Down
Loading