diff --git a/.scalafmt-cross.conf b/.scalafmt-cross.conf deleted file mode 100644 index ae87c810..00000000 --- a/.scalafmt-cross.conf +++ /dev/null @@ -1,14 +0,0 @@ -version = 3.8.1 -runner.dialect = scala213source3 -maxColumn = 120 -indent.defnSite = 2 -indent.caseSite = 2 -newlines.source=keep - -# Override for code that is used for cross compilation -fileOverride { - "glob:**/scala-3/**" { - runner.dialect = scala3 - } -} - diff --git a/.scalafmt.conf b/.scalafmt.conf index 1e190738..45ab9f53 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -4,3 +4,25 @@ maxColumn = 120 indent.defnSite = 2 indent.caseSite = 2 newlines.source=keep + +rewrite.scala3.convertToNewSyntax = true +rewrite.scala3.removeOptionalBraces = { + enabled = true +} +rewrite.scala3.insertEndMarkerMinLines = 50 +rewrite.scala3.removeEndMarkerMaxLines = 49 + +fileOverride { + "glob:**/zio1-zmq/src/main/scala/**" { + runner.dialect = scala213source3 + } + "glob:**/zio1-zmq/src/main/scala-2.13/**" { + runner.dialect = scala213source3 + } + "glob:**/zio2-zmq/src/main/scala/**" { + runner.dialect = scala213source3 + } + "glob:**/zio2-zmq/src/main/scala-2.13/**" { + runner.dialect = scala213source3 + } +} diff --git a/build.sbt b/build.sbt index c4df9cfb..3fc6f020 100644 --- a/build.sbt +++ b/build.sbt @@ -140,7 +140,6 @@ lazy val zio1zmq = project .settings( name := "zio1-zmq", crossScalaVersions := supportedScalaVersions, - scalafmtConfig := file(".scalafmt-cross.conf"), scalacOptions ++= commonScalacOptions.value, libraryDependencies ++= Seq( "dev.zio" %% "zio" % zio1Version, @@ -162,7 +161,6 @@ lazy val zio2zmq = project name := "zio2-zmq", scalaVersion := mainScalaVersion, crossScalaVersions := supportedScalaVersions, - scalafmtConfig := file(".scalafmt-cross.conf"), scalacOptions ++= commonScalacOptions.value, libraryDependencies ++= Seq( "dev.zio" %% "zio" % zio2Version, diff --git a/raft-zmq/src/main/scala/zio/raft/zmq/ProtocolMessage.scala b/raft-zmq/src/main/scala/zio/raft/zmq/ProtocolMessage.scala index 6214ecfc..30c862fa 100644 --- a/raft-zmq/src/main/scala/zio/raft/zmq/ProtocolMessage.scala +++ b/raft-zmq/src/main/scala/zio/raft/zmq/ProtocolMessage.scala @@ -19,5 +19,3 @@ object ProtocolMessage: .typecase(2, RpcMessageCodec.codec[A].as[ProtocolMessage.Rpc[A]]) private def memberIdCodec = utf8_32.xmap(MemberId(_), _.value) - -end ProtocolMessage diff --git a/raft-zmq/src/main/scala/zio/raft/zmq/RpcMessageCodec.scala b/raft-zmq/src/main/scala/zio/raft/zmq/RpcMessageCodec.scala index 4c47da72..64b6d85a 100644 --- a/raft-zmq/src/main/scala/zio/raft/zmq/RpcMessageCodec.scala +++ b/raft-zmq/src/main/scala/zio/raft/zmq/RpcMessageCodec.scala @@ -107,3 +107,4 @@ object RpcMessageCodec: .by(uint8) .typecase(0, (memberIdCodec :: termCodec :: indexCodec :: bool(8)).as[InstallSnapshotResult.Success[A]]) .typecase(1, (memberIdCodec :: termCodec :: indexCodec).as[InstallSnapshotResult.Failure[A]]) +end RpcMessageCodec diff --git a/raft-zmq/src/main/scala/zio/raft/zmq/ZmqRpc.scala b/raft-zmq/src/main/scala/zio/raft/zmq/ZmqRpc.scala index 7b0f214b..5670f2ab 100644 --- a/raft-zmq/src/main/scala/zio/raft/zmq/ZmqRpc.scala +++ b/raft-zmq/src/main/scala/zio/raft/zmq/ZmqRpc.scala @@ -94,6 +94,7 @@ class ZmqRpc[A <: Command: Codec](server: ZSocket, clients: Map[MemberId, ZSocke .collectRight .via(RemoveDuplicate[A]()) // Because the raft messaging might be very chatty, we want to remove duplicates .catchAll(err => ZStream.die(err)) +end ZmqRpc object ZmqRpc: def make[A <: Command: Codec](bindAddress: String, peers: Map[MemberId, String]) = diff --git a/raft/src/main/scala/zio/raft/InsertSortList.scala b/raft/src/main/scala/zio/raft/InsertSortList.scala index 2eaaff5d..06834f96 100644 --- a/raft/src/main/scala/zio/raft/InsertSortList.scala +++ b/raft/src/main/scala/zio/raft/InsertSortList.scala @@ -8,7 +8,7 @@ case class InsertSortList[A](list: List[A])(using ordering: Ordering[A]) extends override def iterator: Iterator[A] = list.iterator def withSortedInsert(a: A): InsertSortList[A] = - if (list.isEmpty || ordering.gteq(a, list.last)) then InsertSortList(list :+ a) + if list.isEmpty || ordering.gteq(a, list.last) then InsertSortList(list :+ a) else val (before, after) = list.span(ordering.lteq(_, a)) InsertSortList(before ++ (a :: after)) diff --git a/raft/src/main/scala/zio/raft/LogStore.scala b/raft/src/main/scala/zio/raft/LogStore.scala index 927c3f27..70a21647 100644 --- a/raft/src/main/scala/zio/raft/LogStore.scala +++ b/raft/src/main/scala/zio/raft/LogStore.scala @@ -33,8 +33,6 @@ trait LogStore[A <: Command]: case Some(ourTerm) if ourTerm <= term => ZIO.succeed((ourTerm, index)) case Some(ourTerm) => findConflictByTerm(term, index.minusOne) -end LogStore - object LogStore: def makeInMemory[A <: Command]: ZIO[Any, Nothing, InMemoryLogStore[A]] = for logs <- Ref.make(List.empty[LogEntry[A]]) diff --git a/raft/src/main/scala/zio/raft/Raft.scala b/raft/src/main/scala/zio/raft/Raft.scala index ac768235..9082e25c 100644 --- a/raft/src/main/scala/zio/raft/Raft.scala +++ b/raft/src/main/scala/zio/raft/Raft.scala @@ -115,14 +115,13 @@ class Raft[S, A <: Command]( lastTerm <- logStore.lastTerm lastIndex <- logStore.lastIndex result <- - if ( + if currentTerm == m.term && (votedFor.contains( m.candidateId ) || votedFor.isEmpty) && (m.lastLogTerm > lastTerm || (m.lastLogTerm == lastTerm && m.lastLogIndex >= lastIndex)) - ) then for _ <- stable.voteFor(m.candidateId) @@ -241,7 +240,7 @@ class Raft[S, A <: Command]( else if success then for _ <- ZIO.when(m.previousIndex < lastIndex)( - ZIO.foreachDiscard(m.entries)(entry => { + ZIO.foreachDiscard(m.entries)(entry => for logTerm <- logStore.logTerm(entry.index) _ <- ZIO.when( @@ -250,7 +249,7 @@ class Raft[S, A <: Command]( logStore.deleteFrom(entry.index) ) yield () - }) + ) ) _ <- logStore.storeLogs(m.entries) messageLastIndex = m.entries.last.index @@ -777,6 +776,8 @@ class Raft[S, A <: Command]( ) yield () yield () + end for + end sendAppendEntries private def sendRequestVoteRule(peer: MemberId) = for @@ -929,10 +930,10 @@ class Raft[S, A <: Command]( // todo: leader only for promiseArg <- Promise.make[NotALeaderError, commandArg.Response] - _ <- commandsQueue.offer(new CommandMessage { + _ <- commandsQueue.offer(new CommandMessage: val command = commandArg val promise = promiseArg.asInstanceOf - }) + ) res <- promiseArg.await yield (res) @@ -1081,3 +1082,4 @@ object Raft: ) _ <- raft.run.forkScoped yield raft +end Raft diff --git a/raft/src/main/scala/zio/raft/ReplicationStatus.scala b/raft/src/main/scala/zio/raft/ReplicationStatus.scala index 18d5a238..83bc1b84 100644 --- a/raft/src/main/scala/zio/raft/ReplicationStatus.scala +++ b/raft/src/main/scala/zio/raft/ReplicationStatus.scala @@ -67,6 +67,7 @@ class ReplicationStatus(val peerStatus: Map[MemberId, PeerReplicationStatus]): peerStatus.get(peer) match case Some(PeerReplicationStatus.Snapshot(_, _)) => true case _ => false +end ReplicationStatus object ReplicationStatus: def apply(peers: Peers) = new ReplicationStatus( diff --git a/raft/src/main/scala/zio/raft/SnapshotStore.scala b/raft/src/main/scala/zio/raft/SnapshotStore.scala index 53b688a9..8d1258e3 100644 --- a/raft/src/main/scala/zio/raft/SnapshotStore.scala +++ b/raft/src/main/scala/zio/raft/SnapshotStore.scala @@ -111,3 +111,5 @@ object SnapshotStore: yield result override def latestSnapshot = latest.get + end InMemorySnapshotStore +end SnapshotStore diff --git a/raft/src/main/scala/zio/raft/State.scala b/raft/src/main/scala/zio/raft/State.scala index 489a2624..a94fa586 100644 --- a/raft/src/main/scala/zio/raft/State.scala +++ b/raft/src/main/scala/zio/raft/State.scala @@ -111,3 +111,5 @@ object State: def completeReads(index: Index, readState: S): UIO[Leader[S]] = for pendingReads <- pendingReads.resolveReadsForCommand(index, readState) yield this.copy(pendingReads = pendingReads) + end Leader +end State diff --git a/raft/src/main/scala/zio/raft/Types.scala b/raft/src/main/scala/zio/raft/Types.scala index fbd22851..64d7451e 100644 --- a/raft/src/main/scala/zio/raft/Types.scala +++ b/raft/src/main/scala/zio/raft/Types.scala @@ -57,10 +57,9 @@ case class NotALeaderError(leaderId: Option[MemberId]) type CommandPromise[A] = Promise[NotALeaderError, A] -sealed trait LogEntry[+A <: Command] { +sealed trait LogEntry[+A <: Command]: val term: Term val index: Index -} object LogEntry: case class CommandLogEntry[A <: Command](command: A, val term: Term, val index: Index) extends LogEntry[A] diff --git a/raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala b/raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala index a60a838c..34be53e9 100644 --- a/raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala +++ b/raft/src/test/scala/zio/raft/RaftIntegrationSpec.scala @@ -13,7 +13,7 @@ import zio.FiberRefs import zio.LogSpan import zio.Trace import java.util.concurrent.ConcurrentLinkedQueue -import scala.jdk.CollectionConverters._ +import scala.jdk.CollectionConverters.* object RaftIntegrationSpec extends ZIOSpecDefault: diff --git a/stores/src/main/scala/zio/raft/stores/FileSnapshotStore.scala b/stores/src/main/scala/zio/raft/stores/FileSnapshotStore.scala index 38e5be78..8c9ee337 100644 --- a/stores/src/main/scala/zio/raft/stores/FileSnapshotStore.scala +++ b/stores/src/main/scala/zio/raft/stores/FileSnapshotStore.scala @@ -216,6 +216,7 @@ class FileSnapshotStore( _ <- updateLatest(snapshot.previousTerm, snapshot.previousIndex, size) _ <- deleteOldSnapshots yield () +end FileSnapshotStore object FileSnapshotStore: def make(directory: Path): ZIO[Environment, Nothing, FileSnapshotStore] = diff --git a/stores/src/main/scala/zio/raft/stores/segmentedlog/BaseTransducer.scala b/stores/src/main/scala/zio/raft/stores/segmentedlog/BaseTransducer.scala index fbd24f87..287a168b 100644 --- a/stores/src/main/scala/zio/raft/stores/segmentedlog/BaseTransducer.scala +++ b/stores/src/main/scala/zio/raft/stores/segmentedlog/BaseTransducer.scala @@ -46,7 +46,7 @@ class BaseTransducer(ref: Ref[BaseTransducer.State], validateChecksum: Boolean): case ReadRecordType(offset, index, bits, chunkBuilder, crcBuilder) => isEntryCodec.decode(bits) match case Successful(decodeResult) => - if (decodeResult.value) + if decodeResult.value then process(results, ReadSize(offset, index, decodeResult.remainder, chunkBuilder, crcBuilder)) else process(results, ReadChecksum(offset, index, decodeResult.remainder, chunkBuilder, crcBuilder)) case Failure(e: Err.InsufficientBits) => @@ -131,6 +131,7 @@ class BaseTransducer(ref: Ref[BaseTransducer.State], validateChecksum: Boolean): ref.set(ReadChecksum(offset, index, bitsVector, chunkBuilder, crcBuilder)).as(results) case f: Failure => ZIO.fail(new Throwable(s"Error decoding checksum: ${f.cause.messageWithContext}")) +end BaseTransducer object BaseTransducer: val headerSize = fileHeaderCodec.sizeBound.exact.get / 8 val sizeSize = entrySizeCodec.sizeBound.exact.get / 8 @@ -198,3 +199,4 @@ object BaseTransducer: .make[State](ReadFileHeader(firstIndex, BitVector.empty)) .map(ref => new BaseTransducer(ref, validateChecksum).apply) ) +end BaseTransducer diff --git a/stores/src/main/scala/zio/raft/stores/segmentedlog/ChecksummedList.scala b/stores/src/main/scala/zio/raft/stores/segmentedlog/ChecksummedList.scala index 90881680..ffc4ad7d 100644 --- a/stores/src/main/scala/zio/raft/stores/segmentedlog/ChecksummedList.scala +++ b/stores/src/main/scala/zio/raft/stores/segmentedlog/ChecksummedList.scala @@ -105,3 +105,4 @@ class ChecksummedList[A](codec: Codec[A]) extends Codec[List[A]]: Attempt.fromErrOption(error, DecodeResult(bldr.result, remaining)) override def toString = s"ChecksummedList($codec)" +end ChecksummedList diff --git a/stores/src/main/scala/zio/raft/stores/segmentedlog/LocalLongRef.scala b/stores/src/main/scala/zio/raft/stores/segmentedlog/LocalLongRef.scala index cdd956d9..aec43069 100644 --- a/stores/src/main/scala/zio/raft/stores/segmentedlog/LocalLongRef.scala +++ b/stores/src/main/scala/zio/raft/stores/segmentedlog/LocalLongRef.scala @@ -2,7 +2,8 @@ package zio.raft.stores.segmentedlog import zio.{UIO, ZIO} -sealed trait LocalLongRef[A] { self => +sealed trait LocalLongRef[A]: + self => def get: UIO[A] def set(a: A): UIO[Unit] @@ -14,11 +15,11 @@ sealed trait LocalLongRef[A] { self => def modify[B](f: A => (B, A)): UIO[B] def dimap[B](ab: A => B, ba: B => A): LocalLongRef[B] -} object LocalLongRef: def make(value: Long): UIO[LocalLongRef[Long]] = ZIO.succeed(new VolatileLongRef(value)) - private class VolatileLongRef(@volatile private var value: Long) extends LocalLongRef[Long] { self => + private class VolatileLongRef(@volatile private var value: Long) extends LocalLongRef[Long]: + self => def get = ZIO.succeed(value) def set(a: Long) = ZIO.succeed(this.value = a) @@ -46,9 +47,9 @@ object LocalLongRef: def longToA(s: S) = ab(s) def aToLong(a: B) = ba(a) - } - private abstract class Derived[A] extends LocalLongRef[A] { self => + private abstract class Derived[A] extends LocalLongRef[A]: + self => val value: VolatileLongRef def longToA(s: Long): A @@ -74,4 +75,4 @@ object LocalLongRef: def aToLong(b: B): Long = self.aToLong(ba(b)) val value = self.value - } +end LocalLongRef diff --git a/stores/src/main/scala/zio/raft/stores/segmentedlog/OpenSegment.scala b/stores/src/main/scala/zio/raft/stores/segmentedlog/OpenSegment.scala index 77853f7e..20bf3963 100644 --- a/stores/src/main/scala/zio/raft/stores/segmentedlog/OpenSegment.scala +++ b/stores/src/main/scala/zio/raft/stores/segmentedlog/OpenSegment.scala @@ -47,12 +47,12 @@ class OpenSegment[A <: Command: Codec]( else ZIO.none def getLastTermIndex[A <: Command: Codec]: ZIO[Any, Nothing, (Term, Index)] = - for { + for last <- makeStream(channel) .via(recordsOnly) .run(lastAndDecode) .orDie - } yield last match + yield last match case None => (previousTerm, firstIndex.minusOne) case Some(e) => (e.term, e.index) @@ -62,19 +62,19 @@ class OpenSegment[A <: Command: Codec]( // segment size is just a recommendation at this moment. // TODO: we should only write entries that will fit in the segment and return the rest to be written in the next segment def writeEntries(entries: List[LogEntry[A]]): ZIO[Any, Nothing, Int] = - for { + for bytes <- ZIO.attempt(logEntriesCodec[A].encode(entries).require.toByteVector).orDie position <- positionRef.get written <- channel.write(bytes, position).orDie _ <- channel.force(true).orDie _ <- positionRef.update(_ + written) - } yield written + yield written def deleteFrom(minInclusive: Index): ZIO[Any, Nothing, Unit] = for // we need to find the offset of the entry to truncate the file offset <- - if (minInclusive == firstIndex) ZIO.succeed(BaseTransducer.headerSize) + if minInclusive == firstIndex then ZIO.succeed(BaseTransducer.headerSize) else makeStream(channel) .via(recordsOnly) @@ -174,6 +174,7 @@ class OpenSegment[A <: Command: Codec]( yield () def close() = channel.close.orDie +end OpenSegment object OpenSegment: def createNewSegment[A <: Command: Codec]( @@ -184,9 +185,9 @@ object OpenSegment: ): ZIO[Any, Nothing, ZIO[Scope, Nothing, OpenSegment[A]]] = val fullPath = Path(logDirectory, fileName) - for { + for _ <- createFileWithHeader(logDirectory, fullPath) - } yield openSegment[A](fullPath, firstIndex, previousTerm) + yield openSegment[A](fullPath, firstIndex, previousTerm) // atomically create a new segment with an empty log entry with the previous index and term @@ -248,3 +249,4 @@ object OpenSegment: segment = new OpenSegment[A](fullPath, channel, positionRef, firstIndex, previousTerm) yield segment +end OpenSegment diff --git a/stores/src/main/scala/zio/raft/stores/segmentedlog/ReadOnlySegment.scala b/stores/src/main/scala/zio/raft/stores/segmentedlog/ReadOnlySegment.scala index 567571a2..5f23e36b 100644 --- a/stores/src/main/scala/zio/raft/stores/segmentedlog/ReadOnlySegment.scala +++ b/stores/src/main/scala/zio/raft/stores/segmentedlog/ReadOnlySegment.scala @@ -22,7 +22,7 @@ class ReadOnlySegment[A <: Command: Codec]( .flatMap(channel => makeStream(channel)) .via(recordsOnly) - if (startInclusive.value > firstIndex.value) + if startInclusive.value > firstIndex.value then stream.drop((startInclusive.value - firstIndex.value).toInt).takeWhile(_.index <= toInclusive).via(decode) else stream.takeWhile(_.index <= toInclusive).via(decode) @@ -33,7 +33,7 @@ class ReadOnlySegment[A <: Command: Codec]( fromInclusive.value < lastIndexExclusive.value && toInclusive >= firstIndex def getEntry(index: Index): ZIO[Any, Nothing, Option[LogEntry[A]]] = - if (isInSegment(index)) + if isInSegment(index) then ZStream .scoped(AsynchronousFileChannel.open(path, StandardOpenOption.READ)) .flatMap(channel => makeStream(channel)) @@ -48,7 +48,7 @@ class ReadOnlySegment[A <: Command: Codec]( // Checks if the index is in the segment, i.e is the index between firstIndex and lastIndex def isInSegment(index: Index) = - if (index >= firstIndex) + if index >= firstIndex then lastIndexExclusive match case None => true case Some(lastIndexExclusive) => @@ -64,6 +64,7 @@ class ReadOnlySegment[A <: Command: Codec]( def delete = Files.delete(path).orDie override def toString(): String = s"Segment($firstIndex)" +end ReadOnlySegment object ReadOnlySegment: def open[A <: Command: Codec](logDirectory: String, fileName: String, firstIndex: Index, lastIndex: Option[Index]) = val path = Path(logDirectory, fileName) diff --git a/stores/src/main/scala/zio/raft/stores/segmentedlog/SegmentMetadataDatabase.scala b/stores/src/main/scala/zio/raft/stores/segmentedlog/SegmentMetadataDatabase.scala index 928654c0..e3045075 100644 --- a/stores/src/main/scala/zio/raft/stores/segmentedlog/SegmentMetadataDatabase.scala +++ b/stores/src/main/scala/zio/raft/stores/segmentedlog/SegmentMetadataDatabase.scala @@ -64,8 +64,6 @@ class SegmentMetadataDatabase(environment: Environment, database: Database): def discardAll = environment.transact(database.truncate).orDie -end SegmentMetadataDatabase - object SegmentMetadataDatabase: case class SegmentMetadata( id: Long, @@ -101,5 +99,3 @@ object SegmentMetadataDatabase: environment <- ZIO.service[Environment] database <- environment.openDatabase("segment-metadata", DbiFlags.MDB_CREATE).orDie yield SegmentMetadataDatabase(environment, database) - -end SegmentMetadataDatabase diff --git a/stores/src/main/scala/zio/raft/stores/segmentedlog/SegmentedLog.scala b/stores/src/main/scala/zio/raft/stores/segmentedlog/SegmentedLog.scala index a9405d36..8e0916c4 100644 --- a/stores/src/main/scala/zio/raft/stores/segmentedlog/SegmentedLog.scala +++ b/stores/src/main/scala/zio/raft/stores/segmentedlog/SegmentedLog.scala @@ -78,7 +78,7 @@ class SegmentedLog[A <: Command: Codec]( ) override def storeLog(entry: LogEntry[A]) = - for { + for logFile <- currentSegment.get _ <- logFile.writeEntry(entry) @@ -90,10 +90,10 @@ class SegmentedLog[A <: Command: Codec]( // We allow a file to exceed the maximum size, but immediately after that we create a new file logFileSize <- logFile.size _ <- ZIO.when(logFileSize > maxLogFileSize)(createNextSegment()) - } yield () + yield () override def storeLogs(entries: List[LogEntry[A]]): UIO[Unit] = - for { + for logFile <- currentSegment.get _ <- logFile.writeEntries(entries) @@ -106,7 +106,7 @@ class SegmentedLog[A <: Command: Codec]( // We allow a file to exceed the maximum size, but immediately after that we create a new file logFileSize <- logFile.size _ <- ZIO.when(logFileSize > maxLogFileSize)(createNextSegment()) - } yield () + yield () override def discardEntireLog(previousIndex: Index, previousTerm: Term): UIO[Unit] = for @@ -192,7 +192,7 @@ class SegmentedLog[A <: Command: Codec]( yield () private def createNextSegment(): ZIO[Any, Nothing, Unit] = - for { + for firstIndex <- lastIndex.map(_.plusOne) previousTerm <- lastTerm newSegment <- SegmentedLog.createNewSegment[A](logDirectory, segmentMetadataDatabase, firstIndex, previousTerm) @@ -202,7 +202,7 @@ class SegmentedLog[A <: Command: Codec]( s"Creating new segment, old segment ${oldSegment.firstIndex}, old file size: $oldSegmentSize, new file ${firstIndex}" ) _ <- currentSegment.switch(newSegment) - } yield () + yield () // TODO: this should be a stream or in memory private def listSegments = @@ -239,7 +239,7 @@ object SegmentedLog: logDirectory: String, maxLogFileSize: Long = 1024 * 1024 * 100 /*100 MB*/ ): ZIO[Environment & Scope, Nothing, SegmentedLog[A]] = - for { + for database <- SegmentMetadataDatabase.make segments <- database.getAll @@ -265,4 +265,5 @@ object SegmentedLog: (lastTerm, lastIndex) = tuple lastIndexRef <- LocalLongRef.make(lastIndex.value).map(_.dimap[Index](Index(_), _.value)) lastTermRef <- LocalLongRef.make(lastTerm.value).map(_.dimap[Term](Term(_), _.value)) - } yield new SegmentedLog[A](logDirectory, maxLogFileSize, currentFile, lastIndexRef, lastTermRef, database) + yield new SegmentedLog[A](logDirectory, maxLogFileSize, currentFile, lastIndexRef, lastTermRef, database) +end SegmentedLog diff --git a/stores/src/main/scala/zio/raft/stores/segmentedlog/internal.scala b/stores/src/main/scala/zio/raft/stores/segmentedlog/internal.scala index 92b1954b..7b9ea5e9 100644 --- a/stores/src/main/scala/zio/raft/stores/segmentedlog/internal.scala +++ b/stores/src/main/scala/zio/raft/stores/segmentedlog/internal.scala @@ -67,6 +67,7 @@ object internal: object CurrentSegment: def make[A <: Command](file: ZIO[Scope, Nothing, OpenSegment[A]]) = - for { + for ref <- ScopedRef.fromAcquire(file.asSome) - } yield new CurrentSegment(ref) + yield new CurrentSegment(ref) +end internal diff --git a/zio-lmdb/src/main/scala/zio/lmdb/Database.scala b/zio-lmdb/src/main/scala/zio/lmdb/Database.scala index 801b976b..1b009d41 100644 --- a/zio-lmdb/src/main/scala/zio/lmdb/Database.scala +++ b/zio-lmdb/src/main/scala/zio/lmdb/Database.scala @@ -3,7 +3,7 @@ package zio.lmdb import org.lmdbjava.{DbiFlags, Dbi} import zio.ZIO import zio.stream.ZStream -import java.{util => ju} +import java.{util as ju} class Database(dbi: Dbi[Array[Byte]]): def put(key: Array[Byte], value: Array[Byte]): ZIO[TransactionScope, Throwable, Boolean] = @@ -39,8 +39,6 @@ class Database(dbi: Dbi[Array[Byte]]): ) .map(kv => kv.key() -> kv.`val`()) -end Database - object Database: def open(name: String, flags: DbiFlags*): ZIO[Environment, Throwable, Database] = for diff --git a/zio-lmdb/src/main/scala/zio/lmdb/Environment.scala b/zio-lmdb/src/main/scala/zio/lmdb/Environment.scala index 766faedd..43bc6e39 100644 --- a/zio-lmdb/src/main/scala/zio/lmdb/Environment.scala +++ b/zio-lmdb/src/main/scala/zio/lmdb/Environment.scala @@ -55,16 +55,15 @@ object Environment: ZIO .acquireRelease(ZIO.attemptBlocking(b.open(file, flags*)))(env => ZIO.attemptBlocking(env.close()).orDie) .map(e => - new Environment { + new Environment: val env = e - } ) def layer(file: File) = ZLayer.scoped(build(file)) final class Transact[R](env: Env[Array[Byte]]): def apply[E, A](zio: => ZIO[TransactionScope & R, E, A]): ZIO[R, E, A] = - ZIO.uninterruptibleMask(restore => { + ZIO.uninterruptibleMask(restore => for txn <- ZIO.attemptBlocking(env.txnWrite()).orDie exit <- restore(zio.provideSomeLayer(ZLayer.succeed(TransactionScope(txn)))).exit @@ -74,18 +73,18 @@ object Environment: _ <- ZIO.attemptBlocking(txn.close()).orDie result <- ZIO.done(exit) yield result - }) + ) final class TransactReadOnly[R](env: Env[Array[Byte]]): def apply[E, A](zio: => ZIO[TransactionScope & R, E, A]): ZIO[R, E, A] = - ZIO.uninterruptibleMask(restore => { + ZIO.uninterruptibleMask(restore => for txn <- ZIO.attemptBlocking(env.txnRead()).orDie exit <- restore(zio.provideSomeLayer(ZLayer.succeed(TransactionScope(txn)))).exit _ <- ZIO.attemptBlocking(txn.close()).orDie result <- ZIO.done(exit) yield result - }) + ) final class TransactPartiallyApplied[R](private val dummy: Boolean = true) extends AnyVal: def apply[E, A](zio: => ZIO[TransactionScope & R, E, A]): ZIO[Environment & R, E, A] = @@ -94,3 +93,4 @@ object Environment: final class TransactReadOnlyPartiallyApplied[R](private val dummy: Boolean = true) extends AnyVal: def apply[E, A](zio: => ZIO[TransactionScope & R, E, A]): ZIO[Environment & R, E, A] = ZIO.service[Environment].flatMap(_.transactReadOnly(zio)) +end Environment