-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPhases.hs
229 lines (189 loc) · 8.43 KB
/
Phases.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
-- | Phases of Fast Paxos.
module Sdn.Protocol.Fast.Phases
( propose
, phase2b
, learn
, detectConflicts
) where
import Control.Lens (at, makePrisms, (%=), (.=))
import Control.Monad.Trans.Cont (ContT (..), evalContT)
import qualified Data.Map as M
import qualified Data.Set as S
import Formatting (build, sformat, (%))
import Universum
import Sdn.Base
import Sdn.Extra (OldNew (..), broadcastTo, compose,
foldlF', listF, logInfo, panicOnFail,
presence, takeNoMoreThanOne, throwOnFail,
wasChanged, whenJust', zoom)
import qualified Sdn.Protocol.Classic.Messages as Classic
import qualified Sdn.Protocol.Classic.Phases as Classic
import Sdn.Protocol.Common.Context
import Sdn.Protocol.Common.Phases
import qualified Sdn.Protocol.Fast.Messages as Fast
import Sdn.Protocol.Processes
import Sdn.Protocol.Versions
-- * Helpers
-- | For one who receives a bunch of decisions from acceptors,
-- this type represents a decision on policy acceptance based on received votes.
data PolicyChoiceStatus
-- | We still didn't get enough votes to make any decisions.
= TooFewVotes
-- | Some value gained so many votes, that no any quorum can accept
-- another value.
| OnlyPossible AcceptanceType
-- | Policy gained a quorum of votes.
| PolicyFixated AcceptanceType
-- | We suggest that votes are too contradictory and round is failed.
-- Some value can still be fixated in soonest future, but we don't rely
-- on that and going to start a recovery (i.e. leader is going).
| Undecidable
deriving (Eq, Show)
makePrisms ''PolicyChoiceStatus
-- | Based on votes for policy, build a decision on its opportunity of
-- being fixated.
decideOnPolicyStatus
:: (HasMembers, QuorumFamily qf)
=> Votes qf AcceptanceType -> Either Text PolicyChoiceStatus
decideOnPolicyStatus votesForPolicy = evalContT $ do
let perValueVotes = transposeVotes votesForPolicy
-- TODO: another quorum can go here
do
-- optimization
let heardFromSome = excludesOtherQuorum votesForPolicy
unless heardFromSome $
finishWith TooFewVotes
do
mValueFixated <-
lift . takeNoMoreThanOne "fixated value" $
M.keys $ M.filter isQuorum perValueVotes
whenJust mValueFixated $ \acceptance ->
finishWith $ PolicyFixated acceptance
do
mValueCouldBeChosen <-
lift . takeNoMoreThanOne "possibly chosen value" $
M.keys $ M.filter excludesOtherQuorum perValueVotes
whenJust mValueCouldBeChosen $ \acceptance ->
finishWith $ OnlyPossible acceptance
-- TODO: another quorum can go here
do
let heardFromQuorum = isQuorum votesForPolicy
unless heardFromQuorum $
finishWith TooFewVotes
finishWith Undecidable
where
finishWith x = ContT $ \_ -> return x
-- * Proposal
propose
:: forall cstruct m.
(MonadPhase cstruct m, HasContextOf Proposer Fast m)
=> NonEmpty (RawCmd cstruct) -> m ()
propose policies = do
logInfo $ sformat ("Proposing policy (fast): "%listF "," build) policies
withProcessStateAtomically $ do
proposerProposedPolicies %= (toList policies <>)
proposerUnconfirmedPolicies %= foldlF' S.insert policies
broadcastTo (processesAddresses Acceptor)
(Fast.ProposalMsg @cstruct policies)
-- * Phase 2
phase2b
:: forall cstruct m.
(MonadPhase cstruct m, HasContextOf Acceptor Fast m)
=> PolicyTargets cstruct -> Fast.ProposalMsg cstruct -> m ()
phase2b policyTargets (Fast.ProposalMsg policiesToApply) = do
logInfo "Got proposal"
(leaderMsg, learnersMsg) <- withProcessStateAtomically $ do
appliedPolicies <-
zoom (acceptorCStruct) $
mapM acceptOrRejectIntoStoreS policiesToApply
logInfo $ logFastApplied appliedPolicies
accId <- use acceptorId
let leaderMsg = Fast.AcceptedMsg @Leader @cstruct accId appliedPolicies
let learnersMsg =
[ (targets, Fast.AcceptedMsg @Learner @cstruct accId policies)
| (targets, policies) <-
groupPolicyTargets policyTargets acceptanceCmd (toList appliedPolicies)
]
pure $ (leaderMsg, learnersMsg)
broadcastTo (processAddresses Leader) leaderMsg
forM_ learnersMsg $ \(learners, msg) ->
broadcastTo (processAddress <$> learners) msg
where
logFastApplied =
sformat ("List of fast applied policies:"
%"\n "%listF ", " build)
-- * Learning
learn
:: forall cstruct m.
(MonadPhase cstruct m, HasContextOf Learner Fast m)
=> LearningCallback m -> Fast.AcceptedMsg Learner cstruct -> m ()
learn callback (Fast.AcceptedMsg accId (toList -> cstructDiff)) = do
voteUpdates <- withProcessStateAtomically $
forM cstructDiff $ rememberUnstableCmdVote learnerVotes accId
fixatedValues <- fmap catMaybes . forM voteUpdates $ \(policy, votesForPolicy) -> do
policyStatus <- forM votesForPolicy $
throwOnFail ProtocolError . decideOnPolicyStatus
if wasChanged policyStatus
then do
whenJust' (getNew policyStatus ^? _PolicyFixated) $ \acceptance -> do
let policyAcceptance = compose (acceptance, policy)
logInfo $ sformat ("Policy "%build%" has been fixated")
policyAcceptance
return policyAcceptance
else return Nothing
withProcessStateAtomically $
learnerLearned %=
let addOne cstruct value =
panicOnFail ProtocolError $ addCommand value cstruct
in \learned -> foldl addOne learned fixatedValues
whenNotNull fixatedValues $ onFixatedPolicies callback
-- * Recovery detection and initialition
-- | In fast round, if recovery has been initiated, leader on fast 2b messages
-- acts like if it received proposal messsage.
delegateToRecovery
:: (MonadPhase cstruct m, HasContextOf Leader Fast m)
=> NonEmpty (RawCmd cstruct) -> m ()
delegateToRecovery conflictingPolicies = do
Classic.rememberProposal (Classic.ProposalMsg conflictingPolicies)
detectConflicts
:: forall cstruct m.
(MonadPhase cstruct m, HasContextOf Leader Fast m)
=> Fast.AcceptedMsg Leader cstruct -> m ()
detectConflicts (Fast.AcceptedMsg accId (toList -> cstructDiff)) = do
voteUpdates <- forM cstructDiff $ \policyAcceptance ->
withProcessStateAtomically $
rememberVoteForPolicy @cstruct leaderFastVotes accId policyAcceptance
forM_ voteUpdates $ \(policy, oldnewVotesForPolicy) -> do
oldnewPolicyStatus <- forM oldnewVotesForPolicy $
throwOnFail ProtocolError . decideOnPolicyStatus
when (wasChanged oldnewPolicyStatus) $ do
case getNew oldnewPolicyStatus of
TooFewVotes -> return ()
PolicyFixated acceptance -> do
let policyAcceptance = compose (acceptance, policy)
logInfo $ supposedlyLearnedLog policyAcceptance
-- no need to do anything specific, since decision on policy
-- will never change anymore
OnlyPossible acceptance -> do
let policyAcceptance = compose (acceptance, policy)
logInfo $ onlyChosenLog policyAcceptance
withProcessStateAtomically $
leaderHintPolicies . at policyAcceptance . presence .= True
Undecidable -> do
logInfo $ conflictLog policy
delegateToRecovery (one policy)
logInfo "Policy ^ proposed for next classic ballot"
where
supposedlyLearnedLog =
sformat ("Policy "%build%" supposedly has been learned, \
\not tracking it further")
onlyChosenLog =
sformat ("Policy "%build%" is considered possibly chosen")
conflictLog =
sformat ("Heard about "%build%" by quorum, but no value \
\still has been even potentially chosen. \
\Declaring policy conflict!")