-
Notifications
You must be signed in to change notification settings - Fork 87
Description
The ChainSync server example
Lines 161 to 261 in 29899df
-- | An instance of the server side of the chain sync protocol that reads from | |
-- a pure 'ChainProducerState' stored in a 'StrictTVar'. | |
-- | |
-- This is of course only useful in tests and reference implementations since | |
-- this is not a realistic chain representation. | |
-- | |
chainSyncServerExample :: forall blk header m a. | |
( HasHeader blk | |
, MonadSTM m | |
, HeaderHash header ~ HeaderHash blk | |
) | |
=> a | |
-> StrictTVar m (ChainProducerState blk) | |
-> (blk -> header) | |
-> ChainSyncServer header (Point blk) (Tip blk) m a | |
chainSyncServerExample recvMsgDoneClient chainvar toHeader = ChainSyncServer $ | |
idle <$> newFollower | |
where | |
idle :: FollowerId -> ServerStIdle header (Point blk) (Tip blk) m a | |
idle r = | |
ServerStIdle { | |
recvMsgRequestNext = handleRequestNext r, | |
recvMsgFindIntersect = \pts -> handleFindIntersect r (map castPoint pts), | |
recvMsgDoneClient = pure recvMsgDoneClient | |
} | |
idle' :: FollowerId -> ChainSyncServer header (Point blk) (Tip blk) m a | |
idle' = ChainSyncServer . pure . idle | |
handleRequestNext :: FollowerId | |
-> m (Either (ServerStNext header (Point blk) (Tip blk) m a) | |
(m (ServerStNext header (Point blk) (Tip blk) m a))) | |
handleRequestNext r = do | |
mupdate <- tryReadChainUpdate r | |
case mupdate of | |
Just update -> return (Left (sendNext r update)) | |
Nothing -> return (Right (sendNext r <$> readChainUpdate r)) | |
-- Follower is at the head, have to block and wait for | |
-- the producer's state to change. | |
sendNext :: FollowerId | |
-> (Tip blk, ChainUpdate blk blk) | |
-> ServerStNext header (Point blk) (Tip blk) m a | |
sendNext r (tip, AddBlock b) = SendMsgRollForward (toHeader b) (castTip tip) (idle' r) | |
sendNext r (tip, RollBack p) = SendMsgRollBackward (castPoint p) (castTip tip) (idle' r) | |
handleFindIntersect :: FollowerId | |
-> [Point blk] | |
-> m (ServerStIntersect header (Point blk) (Tip blk) m a) | |
handleFindIntersect r points = do | |
-- TODO: guard number of points | |
-- Find the first point that is on our chain | |
changed <- improveReadPoint r points | |
case changed of | |
(Just pt, tip) -> return $ SendMsgIntersectFound (castPoint pt) (castTip tip) (idle' r) | |
(Nothing, tip) -> return $ SendMsgIntersectNotFound (castTip tip) (idle' r) | |
newFollower :: m FollowerId | |
newFollower = atomically $ do | |
cps <- readTVar chainvar | |
let (cps', rid) = ChainProducerState.initFollower genesisPoint cps | |
writeTVar chainvar cps' | |
return rid | |
improveReadPoint :: FollowerId | |
-> [Point blk] | |
-> m (Maybe (Point blk), Tip blk) | |
improveReadPoint rid points = | |
atomically $ do | |
cps <- readTVar chainvar | |
case ChainProducerState.findFirstPoint (map castPoint points) cps of | |
Nothing -> let chain = ChainProducerState.chainState cps | |
in return (Nothing, castTip (Chain.headTip chain)) | |
Just ipoint -> do | |
let !cps' = ChainProducerState.updateFollower rid ipoint cps | |
writeTVar chainvar cps' | |
let chain = ChainProducerState.chainState cps' | |
return (Just (castPoint ipoint), castTip (Chain.headTip chain)) | |
tryReadChainUpdate :: FollowerId | |
-> m (Maybe (Tip blk, ChainUpdate blk blk)) | |
tryReadChainUpdate rid = | |
atomically $ do | |
cps <- readTVar chainvar | |
case ChainProducerState.followerInstruction rid cps of | |
Nothing -> return Nothing | |
Just (u, cps') -> do | |
writeTVar chainvar cps' | |
let chain = ChainProducerState.chainState cps' | |
return $ Just (castTip (Chain.headTip chain), u) | |
readChainUpdate :: FollowerId -> m (Tip blk, ChainUpdate blk blk) | |
readChainUpdate rid = | |
atomically $ do | |
cps <- readTVar chainvar | |
case ChainProducerState.followerInstruction rid cps of | |
Nothing -> retry | |
Just (u, cps') -> do | |
writeTVar chainvar cps' | |
let chain = ChainProducerState.chainState cps' | |
return (castTip (Chain.headTip chain), u) |
does not adhere to the spec, namely the following aspect:
Whenever the server replies with
$\texttt{MsgIntersectFound}$ the client can expect the next
update (i.e. a reply to$\texttt{MsgRequestNext}$ ) to be$\texttt{MsgRollBackward}$ to the specified$point_{intersect}$
Concretely, when the StrictTVar m (ChainProducerState blk)
changes at the same time as an intersection was found, then thread scheduling can result in sending a MsgRollBackward
to a different point than the negotiated intersection.
The solution here would be to track a bit of state to make sure that the created follower will always first send a MsgRollBackward
to the negotiated intersection even if the ChainProducerState
changed in the meantime.
FTR: This was noticed in IntersectMBO/ouroboros-consensus#1186 (comment), but isn't blocking us.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status