-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPhases.hs
194 lines (161 loc) · 6.97 KB
/
Phases.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE TypeFamilies #-}
-- | Phases of Classic Paxos.
module Sdn.Protocol.Classic.Phases
( propose
, rememberProposal
, phase1a
, phase1b
, phase2a
, phase2b
, learn
) where
import Control.Lens (at, non, to, (%=), (.=), (<+=))
import qualified Data.Set as S
import Formatting (build, sformat, (%))
import Universum
import Sdn.Base
import Sdn.Extra (OldNew (..), broadcastTo, exit, foldlF',
listF, logInfo, pairF, submit,
throwOnFail, wasChanged, zoom, (<<<%=))
import Sdn.Protocol.Classic.Messages
import Sdn.Protocol.Common.Context
import Sdn.Protocol.Common.Phases
import Sdn.Protocol.Processes
import Sdn.Protocol.Versions
-- * Proposal
propose
:: forall cstruct m.
(MonadPhase cstruct m, HasContextOf Proposer Classic m)
=> NonEmpty (RawCmd cstruct) -> m ()
propose policies = do
logInfo $ sformat ("Proposing policies: "%listF "," build) policies
-- remember policy (for testing purposes)
withProcessStateAtomically $ do
proposerProposedPolicies %= (toList policies <>)
proposerUnconfirmedPolicies %= foldlF' S.insert policies
-- and send it to leader
broadcastTo (processAddresses Leader) (ProposalMsg @cstruct policies)
-- * Remembering proposals
rememberProposal
:: (MonadPhase cstruct m, HasContextOf Leader pv m)
=> ProposalMsg cstruct -> m ()
rememberProposal (ProposalMsg policies) = do
-- atomically modify process'es state
let policiesSet = S.fromList $ toList policies
withProcessStateAtomically $ do
leaderProposedPolicies . pendingProposedCommands %= (policiesSet <>)
-- * Phase 1
phase1a
:: (MonadPhase cstruct m, HasContextOf Leader pv m)
=> m ()
phase1a = do
logInfo "Starting new ballot"
mmsg <- withProcessStateAtomically $ runMaybeT $ do
-- increment ballot id
newBallotId <- leaderBallotId <+= 1
-- fixate pending policies as attached to newly started ballot
curBallotProposals <-
zoom leaderProposedPolicies $
dumpProposedCommands newBallotId
-- get policies advised at fast ballots
hintPolicies <- use leaderHintPolicies
-- don't publicly start ballot if there is no proposals
when (null curBallotProposals && null hintPolicies) $ do
logInfo $
sformat ("Skipping "%build%" (no proposals)")
newBallotId
exit
-- make up an "prepare" message
PrepareMsg <$> use leaderBallotId
-- if decided to initiate ballot, notify acceptors about its start
whenJust mmsg $
broadcastTo (processesAddresses Acceptor)
phase1b
:: (MonadPhase cstruct m, HasContextOf Acceptor pv m)
=> PrepareMsg -> m ()
phase1b (PrepareMsg bal) = do
msg <- withProcessStateAtomically $ do
-- promise not to accept messages of lesser ballot numbers
-- make stored ballot id not lesser than @bal@
acceptorLastKnownBallotId %= max bal
PromiseMsg
<$> use acceptorId
<*> use acceptorLastKnownBallotId
<*> use (acceptorCStruct . to totalCStruct)
submit (processAddress Leader) msg
-- * Phase 2
-- | Take hint policies and pending policies and apply them to given cstruct.
applyWaitingPolicies
:: PracticalCStruct cstruct
=> BallotId -> cstruct -> TransactionM (LeaderState pv cstruct) cstruct
applyWaitingPolicies bal cstruct0 = do
hintPolicies <- use leaderHintPolicies
let (unduePolicies, cstruct1) = applyHintCommands (toList hintPolicies) cstruct0
leaderHintPolicies %= foldlF' S.delete (map snd unduePolicies)
logInfo $ logUndueHints unduePolicies
pendingPolicies <- use $ leaderProposedPolicies . at bal . non mempty
let (appliedPolicies, cstruct2) = acceptOrRejectCommands pendingPolicies cstruct1
logInfo $ logAppliedPending bal appliedPolicies
return cstruct2
where
logAppliedPending =
sformat ("Applied policies at "%build%": "%listF "," build)
logUndueHints =
sformat ("Undue hints on policies: "%listF ", " (pairF (build%" ("%build%")")))
phase2a
:: (MonadPhase cstruct m, HasContextOf Leader pv m)
=> PromiseMsg cstruct -> m ()
phase2a (PromiseMsg accId bal cstruct) = do
maybeMsg <- withProcessStateAtomically $ runMaybeT $ do
-- add received vote to set of votes stored locally for this ballot,
-- initializing this set if doesn't exist yet
oldnewVotes <- leaderVotes . at bal . non mempty <<<%= addVote accId cstruct
when (isMinQuorum $ getNew oldnewVotes) $
logInfo $ "Just got 1b from quorum of acceptors at " <> pretty bal
-- evaluate old and new Gamma
oldnewCombined <- forM oldnewVotes $ throwOnFail ProtocolError . combination
-- if there is something new, recalculate Gamma and apply pending policies
if isMinQuorum (getNew oldnewVotes) || wasChanged oldnewCombined
then do
cstructWithNewPolicies <- lift $ applyWaitingPolicies bal (getNew oldnewCombined)
logInfo $ "Broadcasting new cstruct: " <> pretty cstructWithNewPolicies
pure $ AcceptRequestMsg bal cstructWithNewPolicies
else exit
-- when got a message to submit - broadcast it
whenJust maybeMsg $
broadcastTo (processesAddresses Acceptor)
where
phase2b
:: (MonadPhase cstruct m, HasContextOf Acceptor pv m)
=> AcceptRequestMsg cstruct -> m ()
phase2b (AcceptRequestMsg bal cstruct) = do
maybeMsg <- withProcessStateAtomically $ runMaybeT $ do
localBallotId <- use $ acceptorLastKnownBallotId
coreCstruct <- use $ acceptorCStruct . to coreCStruct
-- Check whether did we promise to ignore this message.
-- Case when we get message from this ballot is also checked,
-- because leader can submit updates during ballot, although
-- they are not oblidged to receive in FIFO.
let meetsPromise =
localBallotId < bal ||
localBallotId == bal && (cstruct `extends` coreCstruct)
unless meetsPromise $ do
logInfo "Received cstruct was rejected"
exit
-- if ok, remember received info
acceptorLastKnownBallotId .= bal
use acceptorCStruct
>>= throwOnFail ProtocolError . extendCoreCStruct cstruct
>>= (acceptorCStruct .=)
-- form message
AcceptedMsg
<$> use acceptorId
<*> use (acceptorCStruct . to totalCStruct)
whenJust maybeMsg $
broadcastTo (processesAddresses Learner)
-- * Learning
learn
:: (MonadPhase cstruct m, HasContextOf Learner pv m)
=> LearningCallback m -> AcceptedMsg cstruct -> m ()
learn callback (AcceptedMsg accId cstruct) = learnCStruct callback combination accId cstruct