Skip to content

Refactor mux egress queue #1409

Open
Open
@coot

Description

@coot

The network-mux egress could benefit from abstracting the queuing facility that is used there. Abstracting this out would allow us to test fairness properties in isolation. Below is a snippet which can be used for a start.

Note: this does not change any functionality, and since recent and not yet merged refactorisations
are not touching this part of mux proposed changes should not result in a conflict.

{-# LANGUAGE BangPatterns   #-}
{-# LANGUAGE NamedFieldPuns #-}

import Control.Concurrent.STM
import Control.Monad (replicateM, unless)

import Data.Word (Word16)

--
-- mux queue - the heart of implementation of egress side of Network.Mux module.
--
-- The queue has certain guarantees!
--  * fairness between different protocols writing to the queue
--  * only one wanton from a given protocol at a time in the EgressQueue
--
-- Invariants:
--  * all 'Wanton's which are in 'EgressQueue' are non empty

newtype Wanton a = Wanton { getWanton :: TMVar a }

newEmptyWanton = Wanton <$> newEmptyTMVarIO

-- | A strict function, to avoid memory leaks due to un-evaluated thunks in
-- shared variables.
--
putWanton :: Wanton a -> a -> STM ()
putWanton (Wanton v) !a = putTMVar v a

takeWanton :: Wanton a -> STM a
takeWanton (Wanton v) = takeTMVar v

-- This is an internal data type, no need to export it.
--
data TranslocationRequest ctx a =
    TR { trContext :: !ctx
       , trWanton  :: !(Wanton a)
       }

newtype EgressQueue ctx a = EgressQueue { getEgressQueue :: TQueue (TranslocationRequest ctx a) }

newEgressQueue :: IO (EgressQueue ctx a)
newEgressQueue = EgressQueue <$> newTQueueIO

-- | 'muxChannel' 'send' can use this functo to write to the queue
--
-- It is strict in 'a' (what is inherited from 'putWanton').
writeEgressQueue
    :: EgressQueue ctx a
    -> Wanton a
    -> ctx
    -> a
    -> IO ()
writeEgressQueue (EgressQueue q) w ctx a = atomically $ do
    -- block on the Wanton, until it will get empy (and thus removed from the queue).
    putWanton w a
    writeTQueue q $! TR {trContext = ctx, trWanton = w}


-- | Elements which can be dequeued need to support two operations.
--
-- Using a type-class could be too restrictive.  E.g. there are many ways one
-- can split 'Int' elements in a compatible way with 'Semigroup' instance.
--
data Dequeueable a = Dequeueable
  { splitDequeueable :: Word16 -> a -> (a, a)
  -- ^ split element
  , nullDequeueable :: a -> Bool
  }


-- | 'Network.Mux.Egress.processSinleWanton' is using this logic to read from
-- the queue.
--
readEgressQueue :: Dequeueable a
                -> Word16
                -> EgressQueue ctx a
                -> IO (ctx, a)
readEgressQueue Dequeueable {splitDequeueable, nullDequeueable}
                size
                (EgressQueue q) =
    atomically $ do
      -- read haed of the queue
      tr@TR {trContext, trWanton} <- readTQueue q
      a <- takeWanton trWanton
      let (a', rest) = splitDequeueable size a
      unless (nullDequeueable rest) $ do
          -- only place the wanton back to the queue if it is non-empty
          putWanton trWanton rest
          -- put the rest at the end of the queue, this guarantes fairness
          -- between all Wantons.
          writeTQueue q tr
      pure (trContext, a')

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions