Skip to content

Commit 95766c9

Browse files
committed
complete node
1 parent f95d8fc commit 95766c9

File tree

4 files changed

+142
-34
lines changed

4 files changed

+142
-34
lines changed

client-server-server/src/main/scala/zio/raft/server/RaftServer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ object RaftServer:
6969
case class SessionCreationConfirmed(sessionId: SessionId) extends ServerAction
7070
case class StepUp(sessions: Map[SessionId, SessionMetadata]) extends ServerAction
7171
case class StepDown(leaderId: Option[MemberId]) extends ServerAction
72+
case class LeaderChanged(leaderId: MemberId) extends ServerAction
7273

7374
/** Actions to forward to Raft state machine.
7475
*/
@@ -190,6 +191,9 @@ object RaftServer:
190191
case StreamEvent.Action(ServerAction.StepDown(_)) =>
191192
ZIO.succeed(this)
192193

194+
case StreamEvent.Action(ServerAction.LeaderChanged(leaderId)) =>
195+
ZIO.succeed(this.copy(leaderId = Some(leaderId)))
196+
193197
case StreamEvent.Action(ServerAction.SendResponse(_, _)) =>
194198
ZIO.logWarning("Cannot send response - not leader").as(this)
195199

@@ -269,6 +273,9 @@ object RaftServer:
269273
_ <- sessions.stepDown(transport, leaderId)
270274
yield Follower(leaderId)
271275

276+
case StreamEvent.Action(ServerAction.LeaderChanged(leaderId)) =>
277+
ZIO.logWarning(s"We received LeaderChanged event while in Leader state, this should not happen").as(this)
278+
272279
case StreamEvent.Action(ServerAction.SendResponse(sessionId, response)) =>
273280
sessions.getRoutingId(sessionId) match
274281
case Some(routingId) =>

kvstore/src/main/scala/zio/kvstore/node/Node.scala

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,31 @@ import zio.kvstore.protocol.KVServerRequest
77
import zio.raft.Raft
88
import zio.raft.protocol.*
99
import zio.raft.sessionstatemachine.{SessionCommand, PendingServerRequest, SessionStateMachine}
10-
import zio.raft.sessionstatemachine.given
10+
import zio.raft.sessionstatemachine.Codecs.given
1111
import scodec.Codec
1212
import scodec.bits.ByteVector
1313
import zio.raft.server.RaftServer
1414
import zio.raft.server.RaftServer.RaftAction
1515
import java.time.Instant
16+
import zio.kvstore.Codecs.given
17+
import zio.kvstore.protocol.KVServerRequest.given
18+
import zio.kvstore.node.Node.NodeAction
19+
import zio.kvstore.node.Node.NodeAction.*
20+
import zio.raft.Peers
21+
import zio.raft.stores.LmdbStable
22+
import zio.raft.stores.segmentedlog.SegmentedLog
23+
import zio.raft.stores.FileSnapshotStore
24+
import zio.raft.zmq.ZmqRpc
1625

1726
/** Node wiring RaftServer, Raft core, and KV state machine. */
1827
final case class Node(
1928
raftServer: RaftServer,
2029
raft: zio.raft.Raft[
2130
zio.raft.HMap[Tuple.Concat[zio.raft.sessionstatemachine.SessionSchema[KVResponse, KVServerRequest], KVSchema]],
2231
SessionCommand[KVCommand, KVServerRequest]
23-
],
24-
stateMachine: KVStateMachine
32+
]
2533
):
2634

27-
// Bring codecs for (de)serialization of commands/responses/server-requests
28-
import zio.kvstore.Codecs.given
29-
import zio.kvstore.protocol.KVServerRequest.given
30-
3135
private def encodeServerRequestPayload(req: KVServerRequest): ByteVector =
3236
summon[Codec[KVServerRequest]].encode(req).require.bytes
3337

@@ -43,21 +47,6 @@ final case class Node(
4347
raftServer.sendServerRequest(env.sessionId, ServerRequest(env.requestId, payload, now))
4448
}
4549

46-
// Unified stream pattern (@stream-architecture-pattern): define actions, merge streams, handle in one fold
47-
private sealed trait NodeAction
48-
private object NodeAction:
49-
case class CreateSession(sessionId: SessionId, capabilities: Map[String, String]) extends NodeAction
50-
case class ClientRequest(
51-
sessionId: SessionId,
52-
requestId: RequestId,
53-
lowestPendingRequestId: RequestId,
54-
command: KVCommand
55-
) extends NodeAction
56-
case class ServerRequestAck(sessionId: SessionId, requestId: RequestId) extends NodeAction
57-
case class ExpireSession(sessionId: SessionId) extends NodeAction
58-
case class RetryTick(now: Instant) extends NodeAction
59-
case class StateNotificationReceived(notification: zio.raft.StateNotification) extends NodeAction
60-
6150
private def handleAction(action: NodeAction): UIO[Unit] =
6251
action match
6352
// Process raftActions → SessionCommand → raft → publish via RaftServer
@@ -113,7 +102,7 @@ final case class Node(
113102
for
114103
now <- Clock.instant
115104
cmd = SessionCommand.ServerRequestAck[KVServerRequest](now, sessionId, requestId)
116-
_ <- raft.sendCommand(cmd).either.unit
105+
_ <- raft.sendCommand(cmd).ignore
117106
yield ()
118107

119108
case NodeAction.ExpireSession(sessionId) =>
@@ -151,14 +140,11 @@ final case class Node(
151140
case zio.raft.StateNotification.SteppedUp =>
152141
raft.readState.either.flatMap {
153142
case Right(state) =>
154-
val sessionsSSM = state.iterator["metadata"].toList.collect {
155-
case (sid: SessionId, md: zio.raft.sessionstatemachine.SessionMetadata) => (sid, md)
156-
}.toMap
157-
val sessionsServer: Map[zio.raft.protocol.SessionId, zio.raft.server.SessionMetadata] =
158-
sessionsSSM.map { case (sid, md) =>
159-
(sid, zio.raft.server.SessionMetadata(md.capabilities, md.createdAt))
160-
}
161-
raftServer.stepUp(sessionsServer)
143+
val sessions = SessionStateMachine.getSessions[KVResponse, KVServerRequest, KVSchema](state).map {
144+
case (sessionId: SessionId, metadata) =>
145+
(sessionId, zio.raft.server.SessionMetadata(metadata.capabilities, metadata.createdAt))
146+
}
147+
raftServer.stepUp(sessions)
162148
case Left(_) => ZIO.unit
163149
}
164150
case zio.raft.StateNotification.SteppedDown(leaderId) =>
@@ -180,6 +166,7 @@ final case class Node(
180166
case RaftAction.ExpireSession(sessionId) =>
181167
NodeAction.ExpireSession(sessionId)
182168
},
169+
// TODO: filter this is we are not the leader
183170
ZStream.tick(10.seconds).mapZIO(_ => Clock.instant.map(NodeAction.RetryTick.apply)),
184171
raft.stateNotifications.map(NodeAction.StateNotificationReceived.apply)
185172
)
@@ -220,3 +207,49 @@ final case class Node(
220207
ZIO.logInfo("Node started") *>
221208
unifiedStream.mapZIO(handleAction).runDrain
222209
end Node
210+
211+
object Node:
212+
213+
def make(
214+
serverBindAddress: String,
215+
clusterBindAddress: String,
216+
logDirectory: String,
217+
snapshotDirectory: String,
218+
memberId: zio.raft.MemberId,
219+
peers: Peers
220+
) =
221+
for
222+
stable <- LmdbStable.make
223+
logStore <- SegmentedLog.make[SessionCommand[KVCommand, KVServerRequest]](logDirectory)
224+
snapshotStore <- FileSnapshotStore.make(zio.nio.file.Path(snapshotDirectory))
225+
rpc <- ZmqRpc.make[SessionCommand[KVCommand, KVServerRequest]](
226+
clusterBindAddress,
227+
peers.map(p => (p, clusterBindAddress)).toMap
228+
)
229+
230+
raft <- Raft.make(
231+
memberId = memberId,
232+
peers = peers,
233+
stable = stable,
234+
logStore = logStore,
235+
snapshotStore = snapshotStore,
236+
rpc = rpc,
237+
stateMachine = new KVStateMachine()
238+
)
239+
raftServer <- RaftServer.make(serverBindAddress)
240+
node = Node(raftServer, raft)
241+
yield node
242+
243+
sealed trait NodeAction
244+
object NodeAction:
245+
case class CreateSession(sessionId: SessionId, capabilities: Map[String, String]) extends NodeAction
246+
case class ClientRequest(
247+
sessionId: SessionId,
248+
requestId: RequestId,
249+
lowestPendingRequestId: RequestId,
250+
command: KVCommand
251+
) extends NodeAction
252+
case class ServerRequestAck(sessionId: SessionId, requestId: RequestId) extends NodeAction
253+
case class ExpireSession(sessionId: SessionId) extends NodeAction
254+
case class RetryTick(now: Instant) extends NodeAction
255+
case class StateNotificationReceived(notification: zio.raft.StateNotification) extends NodeAction

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/Codecs.scala

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package zio.raft.sessionstatemachine
33
import zio.raft.protocol.RequestId
44
import scodec.Codec
55
import java.time.Instant
6+
import scodec.codecs.{utf8_32, list, uint8, discriminated}
7+
import zio.raft.protocol.Codecs.{sessionIdCodec, requestIdCodec as requestIdCodecProtocol, instantCodec}
68

79
/** Scodec codec definitions for SessionStateMachine types.
810
*
@@ -28,9 +30,7 @@ object Codecs:
2830
*
2931
* Encodes as: Long
3032
*/
31-
given requestIdCodec: Codec[RequestId] =
32-
import scodec.codecs.int64
33-
int64.xmap(RequestId(_), RequestId.unwrap)
33+
given requestIdCodec: Codec[RequestId] = requestIdCodecProtocol
3434

3535
/** Codec for PendingServerRequest - automatically derived from payload codec.
3636
*
@@ -54,3 +54,63 @@ object Codecs:
5454
{ case (payload, ts) => PendingServerRequest(payload, Instant.ofEpochMilli(ts)) },
5555
p => (p.payload, p.lastSentAt.toEpochMilli)
5656
)
57+
58+
/** Codec for SessionCommand, parameterized by UC (user command) and SR (server request payload). Requires codecs for
59+
* UC and SR in scope.
60+
*/
61+
given sessionCommandCodec[UC <: zio.raft.Command, SR](using
62+
ucCodec: Codec[UC],
63+
srCodec: Codec[SR]
64+
): Codec[SessionCommand[UC, SR]] =
65+
val clientRequestV0: Codec[SessionCommand.ClientRequest[UC, SR]] =
66+
(instantCodec :: sessionIdCodec :: requestIdCodec :: requestIdCodec :: ucCodec)
67+
.as[SessionCommand.ClientRequest[UC, SR]]
68+
val clientRequestCodec: Codec[SessionCommand.ClientRequest[UC, SR]] =
69+
(uint8 :: clientRequestV0).xmap(
70+
{ case (_, cmd) => cmd },
71+
cmd => (0, cmd)
72+
)
73+
74+
val serverRequestAckV0: Codec[SessionCommand.ServerRequestAck[SR]] =
75+
(instantCodec :: sessionIdCodec :: requestIdCodec).as[SessionCommand.ServerRequestAck[SR]]
76+
val serverRequestAckCodec: Codec[SessionCommand.ServerRequestAck[SR]] =
77+
(uint8 :: serverRequestAckV0).xmap(
78+
{ case (_, cmd) => cmd },
79+
cmd => (0, cmd)
80+
)
81+
82+
val createSessionV0: Codec[SessionCommand.CreateSession[SR]] =
83+
val capabilitiesCodec: Codec[Map[String, String]] =
84+
list(utf8_32 :: utf8_32).xmap(_.toMap, _.toList)
85+
(instantCodec :: sessionIdCodec :: capabilitiesCodec).as[SessionCommand.CreateSession[SR]]
86+
val createSessionCodec: Codec[SessionCommand.CreateSession[SR]] =
87+
(uint8 :: createSessionV0).xmap(
88+
{ case (_, cmd) => cmd },
89+
cmd => (0, cmd)
90+
)
91+
92+
val sessionExpiredV0: Codec[SessionCommand.SessionExpired[SR]] =
93+
(instantCodec :: sessionIdCodec).as[SessionCommand.SessionExpired[SR]]
94+
val sessionExpiredCodec: Codec[SessionCommand.SessionExpired[SR]] =
95+
(uint8 :: sessionExpiredV0).xmap(
96+
{ case (_, cmd) => cmd },
97+
cmd => (0, cmd)
98+
)
99+
100+
val getRequestsForRetryV0: Codec[SessionCommand.GetRequestsForRetry[SR]] =
101+
(instantCodec :: instantCodec).as[SessionCommand.GetRequestsForRetry[SR]]
102+
val getRequestsForRetryCodec: Codec[SessionCommand.GetRequestsForRetry[SR]] =
103+
(uint8 :: getRequestsForRetryV0).xmap(
104+
{ case (_, cmd) => cmd },
105+
cmd => (0, cmd)
106+
)
107+
108+
discriminated[SessionCommand[UC, SR]]
109+
.by(uint8)
110+
.typecase(0, clientRequestCodec)
111+
.typecase(1, serverRequestAckCodec)
112+
.typecase(2, createSessionCodec)
113+
.typecase(3, sessionExpiredCodec)
114+
.typecase(4, getRequestsForRetryCodec)
115+
end sessionCommandCodec
116+
end Codecs

session-state-machine/src/main/scala/zio/raft/sessionstatemachine/SessionStateMachine.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,3 +561,11 @@ object SessionStateMachine:
561561
state.exists["serverRequests"] { (_, pending) =>
562562
pending.lastSentAt.isBefore(lastSentBefore)
563563
}
564+
565+
def getSessions[R, SR, UserSchema <: Tuple](
566+
state: HMap[Tuple.Concat[SessionSchema[R, SR], UserSchema]]
567+
): Map[SessionId, SessionMetadata] =
568+
state.iterator["metadata"].collect {
569+
case (sessionId: SessionId, metadata) =>
570+
(sessionId, SessionMetadata(metadata.capabilities, metadata.createdAt))
571+
}.toMap

0 commit comments

Comments
 (0)