Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions .scalafmt-cross.conf

This file was deleted.

22 changes: 22 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,25 @@ maxColumn = 120
indent.defnSite = 2
indent.caseSite = 2
newlines.source=keep

rewrite.scala3.convertToNewSyntax = true
rewrite.scala3.removeOptionalBraces = {
enabled = true
}
rewrite.scala3.insertEndMarkerMinLines = 50
rewrite.scala3.removeEndMarkerMaxLines = 49

fileOverride {
"glob:**/zio1-zmq/src/main/scala/**" {
runner.dialect = scala213source3
}
"glob:**/zio1-zmq/src/main/scala-2.13/**" {
runner.dialect = scala213source3
}
"glob:**/zio2-zmq/src/main/scala/**" {
runner.dialect = scala213source3
}
"glob:**/zio2-zmq/src/main/scala-2.13/**" {
runner.dialect = scala213source3
}
}
2 changes: 0 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ lazy val zio1zmq = project
.settings(
name := "zio1-zmq",
crossScalaVersions := supportedScalaVersions,
scalafmtConfig := file(".scalafmt-cross.conf"),
scalacOptions ++= commonScalacOptions.value,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zio1Version,
Expand All @@ -162,7 +161,6 @@ lazy val zio2zmq = project
name := "zio2-zmq",
scalaVersion := mainScalaVersion,
crossScalaVersions := supportedScalaVersions,
scalafmtConfig := file(".scalafmt-cross.conf"),
scalacOptions ++= commonScalacOptions.value,
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zio2Version,
Expand Down
2 changes: 0 additions & 2 deletions raft-zmq/src/main/scala/zio/raft/zmq/ProtocolMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,3 @@ object ProtocolMessage:
.typecase(2, RpcMessageCodec.codec[A].as[ProtocolMessage.Rpc[A]])

private def memberIdCodec = utf8_32.xmap(MemberId(_), _.value)

end ProtocolMessage
1 change: 1 addition & 0 deletions raft-zmq/src/main/scala/zio/raft/zmq/RpcMessageCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,4 @@ object RpcMessageCodec:
.by(uint8)
.typecase(0, (memberIdCodec :: termCodec :: indexCodec :: bool(8)).as[InstallSnapshotResult.Success[A]])
.typecase(1, (memberIdCodec :: termCodec :: indexCodec).as[InstallSnapshotResult.Failure[A]])
end RpcMessageCodec
1 change: 1 addition & 0 deletions raft-zmq/src/main/scala/zio/raft/zmq/ZmqRpc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ZmqRpc[A <: Command: Codec](server: ZSocket, clients: Map[MemberId, ZSocke
.collectRight
.via(RemoveDuplicate[A]()) // Because the raft messaging might be very chatty, we want to remove duplicates
.catchAll(err => ZStream.die(err))
end ZmqRpc

object ZmqRpc:
def make[A <: Command: Codec](bindAddress: String, peers: Map[MemberId, String]) =
Expand Down
2 changes: 1 addition & 1 deletion raft/src/main/scala/zio/raft/InsertSortList.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ case class InsertSortList[A](list: List[A])(using ordering: Ordering[A]) extends
override def iterator: Iterator[A] = list.iterator

def withSortedInsert(a: A): InsertSortList[A] =
if (list.isEmpty || ordering.gteq(a, list.last)) then InsertSortList(list :+ a)
if list.isEmpty || ordering.gteq(a, list.last) then InsertSortList(list :+ a)
else
val (before, after) = list.span(ordering.lteq(_, a))
InsertSortList(before ++ (a :: after))
Expand Down
2 changes: 0 additions & 2 deletions raft/src/main/scala/zio/raft/LogStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ trait LogStore[A <: Command]:
case Some(ourTerm) if ourTerm <= term => ZIO.succeed((ourTerm, index))
case Some(ourTerm) => findConflictByTerm(term, index.minusOne)

end LogStore

object LogStore:
def makeInMemory[A <: Command]: ZIO[Any, Nothing, InMemoryLogStore[A]] =
for logs <- Ref.make(List.empty[LogEntry[A]])
Expand Down
14 changes: 8 additions & 6 deletions raft/src/main/scala/zio/raft/Raft.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,13 @@ class Raft[S, A <: Command](
lastTerm <- logStore.lastTerm
lastIndex <- logStore.lastIndex
result <-
if (
if
currentTerm == m.term && (votedFor.contains(
m.candidateId
) || votedFor.isEmpty) &&
(m.lastLogTerm > lastTerm ||
(m.lastLogTerm == lastTerm &&
m.lastLogIndex >= lastIndex))
)
then
for
_ <- stable.voteFor(m.candidateId)
Expand Down Expand Up @@ -241,7 +240,7 @@ class Raft[S, A <: Command](
else if success then
for
_ <- ZIO.when(m.previousIndex < lastIndex)(
ZIO.foreachDiscard(m.entries)(entry => {
ZIO.foreachDiscard(m.entries)(entry =>
for
logTerm <- logStore.logTerm(entry.index)
_ <- ZIO.when(
Expand All @@ -250,7 +249,7 @@ class Raft[S, A <: Command](
logStore.deleteFrom(entry.index)
)
yield ()
})
)
)
_ <- logStore.storeLogs(m.entries)
messageLastIndex = m.entries.last.index
Expand Down Expand Up @@ -777,6 +776,8 @@ class Raft[S, A <: Command](
)
yield ()
yield ()
end for
end sendAppendEntries

private def sendRequestVoteRule(peer: MemberId) =
for
Expand Down Expand Up @@ -929,10 +930,10 @@ class Raft[S, A <: Command](
// todo: leader only
for
promiseArg <- Promise.make[NotALeaderError, commandArg.Response]
_ <- commandsQueue.offer(new CommandMessage {
_ <- commandsQueue.offer(new CommandMessage:
val command = commandArg
val promise = promiseArg.asInstanceOf
})
)
res <- promiseArg.await
yield (res)

Expand Down Expand Up @@ -1081,3 +1082,4 @@ object Raft:
)
_ <- raft.run.forkScoped
yield raft
end Raft
1 change: 1 addition & 0 deletions raft/src/main/scala/zio/raft/ReplicationStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ReplicationStatus(val peerStatus: Map[MemberId, PeerReplicationStatus]):
peerStatus.get(peer) match
case Some(PeerReplicationStatus.Snapshot(_, _)) => true
case _ => false
end ReplicationStatus

object ReplicationStatus:
def apply(peers: Peers) = new ReplicationStatus(
Expand Down
2 changes: 2 additions & 0 deletions raft/src/main/scala/zio/raft/SnapshotStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ object SnapshotStore:
yield result

override def latestSnapshot = latest.get
end InMemorySnapshotStore
end SnapshotStore
2 changes: 2 additions & 0 deletions raft/src/main/scala/zio/raft/State.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ object State:
def completeReads(index: Index, readState: S): UIO[Leader[S]] =
for pendingReads <- pendingReads.resolveReadsForCommand(index, readState)
yield this.copy(pendingReads = pendingReads)
end Leader
end State
3 changes: 1 addition & 2 deletions raft/src/main/scala/zio/raft/Types.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ case class NotALeaderError(leaderId: Option[MemberId])

type CommandPromise[A] = Promise[NotALeaderError, A]

sealed trait LogEntry[+A <: Command] {
sealed trait LogEntry[+A <: Command]:
val term: Term
val index: Index
}
object LogEntry:
case class CommandLogEntry[A <: Command](command: A, val term: Term, val index: Index)
extends LogEntry[A]
Expand Down
2 changes: 1 addition & 1 deletion raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import zio.FiberRefs
import zio.LogSpan
import zio.Trace
import java.util.concurrent.ConcurrentLinkedQueue
import scala.jdk.CollectionConverters._
import scala.jdk.CollectionConverters.*

object RaftIntegrationSpec extends ZIOSpecDefault:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class FileSnapshotStore(
_ <- updateLatest(snapshot.previousTerm, snapshot.previousIndex, size)
_ <- deleteOldSnapshots
yield ()
end FileSnapshotStore

object FileSnapshotStore:
def make(directory: Path): ZIO[Environment, Nothing, FileSnapshotStore] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BaseTransducer(ref: Ref[BaseTransducer.State], validateChecksum: Boolean):
case ReadRecordType(offset, index, bits, chunkBuilder, crcBuilder) =>
isEntryCodec.decode(bits) match
case Successful(decodeResult) =>
if (decodeResult.value)
if decodeResult.value then
process(results, ReadSize(offset, index, decodeResult.remainder, chunkBuilder, crcBuilder))
else process(results, ReadChecksum(offset, index, decodeResult.remainder, chunkBuilder, crcBuilder))
case Failure(e: Err.InsufficientBits) =>
Expand Down Expand Up @@ -131,6 +131,7 @@ class BaseTransducer(ref: Ref[BaseTransducer.State], validateChecksum: Boolean):
ref.set(ReadChecksum(offset, index, bitsVector, chunkBuilder, crcBuilder)).as(results)
case f: Failure =>
ZIO.fail(new Throwable(s"Error decoding checksum: ${f.cause.messageWithContext}"))
end BaseTransducer
object BaseTransducer:
val headerSize = fileHeaderCodec.sizeBound.exact.get / 8
val sizeSize = entrySizeCodec.sizeBound.exact.get / 8
Expand Down Expand Up @@ -198,3 +199,4 @@ object BaseTransducer:
.make[State](ReadFileHeader(firstIndex, BitVector.empty))
.map(ref => new BaseTransducer(ref, validateChecksum).apply)
)
end BaseTransducer
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,4 @@ class ChecksummedList[A](codec: Codec[A]) extends Codec[List[A]]:
Attempt.fromErrOption(error, DecodeResult(bldr.result, remaining))

override def toString = s"ChecksummedList($codec)"
end ChecksummedList
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package zio.raft.stores.segmentedlog

import zio.{UIO, ZIO}

sealed trait LocalLongRef[A] { self =>
sealed trait LocalLongRef[A]:
self =>
def get: UIO[A]

def set(a: A): UIO[Unit]
Expand All @@ -14,11 +15,11 @@ sealed trait LocalLongRef[A] { self =>
def modify[B](f: A => (B, A)): UIO[B]

def dimap[B](ab: A => B, ba: B => A): LocalLongRef[B]
}
object LocalLongRef:
def make(value: Long): UIO[LocalLongRef[Long]] = ZIO.succeed(new VolatileLongRef(value))

private class VolatileLongRef(@volatile private var value: Long) extends LocalLongRef[Long] { self =>
private class VolatileLongRef(@volatile private var value: Long) extends LocalLongRef[Long]:
self =>
def get = ZIO.succeed(value)

def set(a: Long) = ZIO.succeed(this.value = a)
Expand Down Expand Up @@ -46,9 +47,9 @@ object LocalLongRef:
def longToA(s: S) = ab(s)

def aToLong(a: B) = ba(a)
}

private abstract class Derived[A] extends LocalLongRef[A] { self =>
private abstract class Derived[A] extends LocalLongRef[A]:
self =>
val value: VolatileLongRef

def longToA(s: Long): A
Expand All @@ -74,4 +75,4 @@ object LocalLongRef:
def aToLong(b: B): Long =
self.aToLong(ba(b))
val value = self.value
}
end LocalLongRef
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ class OpenSegment[A <: Command: Codec](
else ZIO.none

def getLastTermIndex[A <: Command: Codec]: ZIO[Any, Nothing, (Term, Index)] =
for {
for
last <- makeStream(channel)
.via(recordsOnly)
.run(lastAndDecode)
.orDie
} yield last match
yield last match
case None => (previousTerm, firstIndex.minusOne)
case Some(e) => (e.term, e.index)

Expand All @@ -62,19 +62,19 @@ class OpenSegment[A <: Command: Codec](
// segment size is just a recommendation at this moment.
// TODO: we should only write entries that will fit in the segment and return the rest to be written in the next segment
def writeEntries(entries: List[LogEntry[A]]): ZIO[Any, Nothing, Int] =
for {
for
bytes <- ZIO.attempt(logEntriesCodec[A].encode(entries).require.toByteVector).orDie
position <- positionRef.get
written <- channel.write(bytes, position).orDie
_ <- channel.force(true).orDie
_ <- positionRef.update(_ + written)
} yield written
yield written

def deleteFrom(minInclusive: Index): ZIO[Any, Nothing, Unit] =
for
// we need to find the offset of the entry to truncate the file
offset <-
if (minInclusive == firstIndex) ZIO.succeed(BaseTransducer.headerSize)
if minInclusive == firstIndex then ZIO.succeed(BaseTransducer.headerSize)
else
makeStream(channel)
.via(recordsOnly)
Expand Down Expand Up @@ -174,6 +174,7 @@ class OpenSegment[A <: Command: Codec](
yield ()

def close() = channel.close.orDie
end OpenSegment

object OpenSegment:
def createNewSegment[A <: Command: Codec](
Expand All @@ -184,9 +185,9 @@ object OpenSegment:
): ZIO[Any, Nothing, ZIO[Scope, Nothing, OpenSegment[A]]] =
val fullPath = Path(logDirectory, fileName)

for {
for
_ <- createFileWithHeader(logDirectory, fullPath)
} yield openSegment[A](fullPath, firstIndex, previousTerm)
yield openSegment[A](fullPath, firstIndex, previousTerm)

// atomically create a new segment with an empty log entry with the previous index and term

Expand Down Expand Up @@ -248,3 +249,4 @@ object OpenSegment:

segment = new OpenSegment[A](fullPath, channel, positionRef, firstIndex, previousTerm)
yield segment
end OpenSegment
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ReadOnlySegment[A <: Command: Codec](
.flatMap(channel => makeStream(channel))
.via(recordsOnly)

if (startInclusive.value > firstIndex.value)
if startInclusive.value > firstIndex.value then
stream.drop((startInclusive.value - firstIndex.value).toInt).takeWhile(_.index <= toInclusive).via(decode)
else stream.takeWhile(_.index <= toInclusive).via(decode)

Expand All @@ -33,7 +33,7 @@ class ReadOnlySegment[A <: Command: Codec](
fromInclusive.value < lastIndexExclusive.value && toInclusive >= firstIndex

def getEntry(index: Index): ZIO[Any, Nothing, Option[LogEntry[A]]] =
if (isInSegment(index))
if isInSegment(index) then
ZStream
.scoped(AsynchronousFileChannel.open(path, StandardOpenOption.READ))
.flatMap(channel => makeStream(channel))
Expand All @@ -48,7 +48,7 @@ class ReadOnlySegment[A <: Command: Codec](

// Checks if the index is in the segment, i.e is the index between firstIndex and lastIndex
def isInSegment(index: Index) =
if (index >= firstIndex)
if index >= firstIndex then
lastIndexExclusive match
case None => true
case Some(lastIndexExclusive) =>
Expand All @@ -64,6 +64,7 @@ class ReadOnlySegment[A <: Command: Codec](
def delete = Files.delete(path).orDie

override def toString(): String = s"Segment($firstIndex)"
end ReadOnlySegment
object ReadOnlySegment:
def open[A <: Command: Codec](logDirectory: String, fileName: String, firstIndex: Index, lastIndex: Option[Index]) =
val path = Path(logDirectory, fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class SegmentMetadataDatabase(environment: Environment, database: Database):

def discardAll = environment.transact(database.truncate).orDie

end SegmentMetadataDatabase

object SegmentMetadataDatabase:
case class SegmentMetadata(
id: Long,
Expand Down Expand Up @@ -101,5 +99,3 @@ object SegmentMetadataDatabase:
environment <- ZIO.service[Environment]
database <- environment.openDatabase("segment-metadata", DbiFlags.MDB_CREATE).orDie
yield SegmentMetadataDatabase(environment, database)

end SegmentMetadataDatabase
Loading