Skip to content
1 change: 1 addition & 0 deletions scls-format/scls-format.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ library
binary,
byte-order >=0.1.3.1,
bytestring,
cborg,
containers,
crypton,
directory,
Expand Down
53 changes: 52 additions & 1 deletion scls-format/src/Cardano/SCLS/Internal/Entry.hs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeFamilies #-}

module Cardano.SCLS.Internal.Entry (
IsKey (..),
ChunkEntry (..),
GenericCBOREntry (..),
SomeCBOREntry (..),
) where

import Cardano.SCLS.Internal.Serializer.HasKey
import Cardano.SCLS.Internal.Serializer.MemPack (MemPackHeaderOffset (..))
import Cardano.SCLS.Internal.Serializer.MemPack (ByteStringSized (..), CBORTerm (..), MemPackHeaderOffset (..), SomeByteStringSized (..), isolated)
import Cardano.Types.ByteOrdered (BigEndian (..))
import Data.MemPack
import Data.MemPack.Buffer
import Data.Typeable
import Data.Word (Word32)
import GHC.TypeLits (KnownNat, Nat, natVal)

class (Ord a) => IsKey a where
keySize :: Int
Expand Down Expand Up @@ -50,3 +54,50 @@ instance (Eq k, Eq v) => Eq (ChunkEntry k v) where

instance (Ord k, Ord v) => Ord (ChunkEntry k v) where
compare (ChunkEntry k1 v1) (ChunkEntry k2 v2) = compare k1 k2 <> compare v1 v2

{- | A generic chunk entry that we are not aware of exact content type,
except that it is a valid CBOR, as required per SCLS specification.
-}
newtype GenericCBOREntry (n :: Nat) = GenericCBOREntry
{ unGenericCBOREntry :: ChunkEntry (ByteStringSized n) CBORTerm
}
deriving (Show, Eq, Ord)

instance HasKey (GenericCBOREntry n) where
type Key (GenericCBOREntry n) = ByteStringSized n
getKey (GenericCBOREntry (ChunkEntry k _)) = k

instance (KnownNat n) => IsKey (ByteStringSized n) where
keySize = fromInteger (natVal (Proxy :: Proxy n))
packKeyM = packM
unpackKeyM = unpackM

instance (KnownNat n, Typeable (ByteStringSized n)) => MemPack (GenericCBOREntry n) where
packedByteCount (GenericCBOREntry ce) = packedByteCount ce
packM (GenericCBOREntry ce) = packM ce
unpackM =
GenericCBOREntry <$> do
BigEndian (lenEntry :: Word32) <- unpackM
key <- unpackKeyM
value <- isolated (fromIntegral lenEntry - fromIntegral (keySize @(ByteStringSized n)))
return (ChunkEntry key value)

instance (KnownNat n) => MemPackHeaderOffset (GenericCBOREntry n) where
headerSizeOffset = 4

{- | An existential wrapper that allows to store 'GenericCBOREntry' of any
size in the same plan.
-}
data SomeCBOREntry = forall n. (KnownNat n) => SomeCBOREntry (GenericCBOREntry n)

instance MemPack SomeCBOREntry where
packedByteCount (SomeCBOREntry gce) = packedByteCount gce
packM (SomeCBOREntry gce) = packM gce
unpackM = error "unpackM SomeCBOREntry: cannot determine size n at runtime"

instance MemPackHeaderOffset SomeCBOREntry where
headerSizeOffset = 4

instance HasKey SomeCBOREntry where
type Key SomeCBOREntry = SomeByteStringSized
getKey (SomeCBOREntry gce) = SomeByteStringSized (getKey gce)
25 changes: 11 additions & 14 deletions scls-format/src/Cardano/SCLS/Internal/Serializer/External/Impl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Cardano.Types.SlotNo
import Control.Exception (onException, throwIO)
import Control.Monad.ST (runST)
import Data.ByteString qualified as B
import Data.Function (fix, (&))
import Data.Function (fix, on, (&))
import Data.IORef (IORef, modifyIORef', newIORef, readIORef)

import Data.Map.Strict qualified as Map
Expand All @@ -44,7 +44,7 @@ import VectorBuilder.Builder qualified as Builder
import VectorBuilder.MVector qualified as Builder

serialize ::
(MemPack a, Ord a, Typeable a, HasKey a, MemPackHeaderOffset a) =>
(MemPack a, Ord (Key a), Typeable a, HasKey a, MemPackHeaderOffset a) =>
-- | path to resulting file
FilePath ->
-- | Network identifier
Expand Down Expand Up @@ -88,7 +88,7 @@ so on, until we have placed a file.
the size of the entries, but it can be changed without modifying the interface.
-}
prepareExternalSortNamespaced ::
(Typeable a, Ord a, MemPack a) =>
(Typeable a, Ord (Key a), HasKey a, MemPack a) =>
FilePath ->
S.Stream (S.Of (InputChunk a)) IO () ->
IO ()
Expand All @@ -115,7 +115,7 @@ the input may be unordered and we can have a namespaces to appear
multiple times in the stream
-}
mergeChunks ::
(Ord a) =>
(Ord (Key a), HasKey a) =>
S.Stream (S.Of (InputChunk a)) IO () ->
S.Stream (S.Of (Namespace, V.Vector a)) IO ()
mergeChunks = loop Map.empty
Expand All @@ -138,20 +138,17 @@ mergeChunks = loop Map.empty
in if Builder.size i' < chunkSize -- we were no able to fill the chunk, so r is empty
then return $ loop (Map.insert ns i' s') (rest)
else do
let v' = runST do
mv <- Builder.build i'
Tim.sort mv
V.unsafeFreeze mv
let v' = finalizeVector i'
return $ S.yield (ns, v') >> loop (Map.delete ns s') (Step ((ns :> r) :> rest))
loop s (Effect e) = Effect (e >>= \s' -> return (loop s s'))
loop s (Return _) = do
S.each (Map.toList s)
& S.map \(ns, builder) ->
let v = runST do
mv <- Builder.build builder
Tim.sort mv
V.unsafeFreeze mv
in (ns, v)
& S.map \(ns, builder) -> (ns, finalizeVector builder)
finalizeVector :: (Ord (Key a), HasKey a) => Builder.Builder a -> V.Vector a
finalizeVector builder = runST do
mv <- Builder.build builder
Tim.sortBy (compare `on` getKey) mv
V.unsafeFreeze mv

merge2 :: FilePath -> FilePath -> IO ()
merge2 f1 f2 = do
Expand Down
94 changes: 87 additions & 7 deletions scls-format/src/Cardano/SCLS/Internal/Serializer/MemPack.hs
Original file line number Diff line number Diff line change
@@ -1,33 +1,48 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}

-- | Useful utilities for working with MemPack types.
module Cardano.SCLS.Internal.Serializer.MemPack (
Entry (..),
RawBytes (..),
CStringLenBuffer (..),
isolate,
isolated,
MemPackHeaderOffset (..),

-- * Type helpers
Entry (..),
ByteStringSized (..),
SomeByteStringSized (..),
CBORTerm (..),
RawBytes (..),
) where

import Cardano.SCLS.Internal.Serializer.HasKey
import Codec.CBOR.Read qualified as CBOR
import Codec.CBOR.Term qualified as CBOR
import Codec.CBOR.Write qualified as CBOR
import Control.Monad.Reader
import Control.Monad.State.Class
import Control.Monad.Trans.Fail
import Data.ByteArray (ByteArrayAccess, length, withByteArray)
import Data.ByteString (ByteString)
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BSL
import Data.MemPack
import Data.MemPack.Buffer
import Data.MemPack.Error
import Data.Primitive.Ptr
import Data.Text qualified as T
import Data.Typeable
import Data.Word
import Foreign.C.String
import Foreign.Ptr
import GHC.Stack (HasCallStack)
import GHC.TypeLits (KnownNat, Nat, natVal)
import GHC.TypeNats (fromSNat, pattern SNat)
import System.ByteOrder

-- | Typeclass for types that have a fixed header offset when serialized.
Expand All @@ -51,11 +66,7 @@ instance HasKey RawBytes where
instance MemPack (RawBytes) where
packedByteCount (RawBytes bs) = BS.length bs
packM (RawBytes bs) = packByteStringM bs
unpackM = do
len <- Unpack $ \b ->
get >>= \s -> do
return (bufferByteCount b - s)
RawBytes <$> unpackByteStringM len
unpackM = RawBytes <$> consumeBytes

instance MemPackHeaderOffset RawBytes where
headerSizeOffset = 4
Expand Down Expand Up @@ -89,6 +100,16 @@ isolated len = do
a <- Unpack $ \_ -> StateT (\_ -> unpackLeftOverOff b' start unpackM)
return a

{- | Consumes all remaining bytes in the current buffer.

Useful for reading isolated data.
-}
consumeBytes :: (Buffer b) => Unpack b ByteString
consumeBytes = do
start <- get
len <- asks bufferByteCount
unpackByteStringM (len - start)

unpackLeftOverOff :: forall a b. (MemPack a, Buffer b) => (HasCallStack) => b -> Int -> Unpack b a -> Fail SomeError (a, Int)
unpackLeftOverOff buf off action = do
let len = bufferByteCount buf
Expand Down Expand Up @@ -142,3 +163,62 @@ instance ByteArrayAccess CStringLenBuffer where
length (CStringLenBuffer (_, l)) = l
withByteArray (CStringLenBuffer (ptr, _)) f =
f (ptr `plusPtr` 0)

{- | An existential wrapper for the case when we need to compare
fixed-size bytestrings to each other.

When we compare then we do it in length-first number, first we compare
the sizes, and then the bytestring content. This approach matches
the ordering required by the canonical CBOR.
-}
data SomeByteStringSized where
SomeByteStringSized :: (KnownNat n) => ByteStringSized n -> SomeByteStringSized

instance Eq SomeByteStringSized where
(SomeByteStringSized (ByteStringSized bs1)) == (SomeByteStringSized (ByteStringSized bs2)) = bs1 == bs2

instance Ord SomeByteStringSized where
compare (SomeByteStringSized (ByteStringSized bs1 :: ByteStringSized n)) (SomeByteStringSized (ByteStringSized bs2 :: ByteStringSized m)) =
compare
(fromSNat @n SNat)
(fromSNat @m SNat)
<> compare bs1 bs2

-- | A bytestring with the size known at compile time.
newtype ByteStringSized (n :: Nat) = ByteStringSized ByteString
deriving (Eq, Ord, Show)

instance (KnownNat n) => MemPack (ByteStringSized n) where
packedByteCount _ = fromInteger (natVal (Proxy :: Proxy n))

packM (ByteStringSized bs) = do
let expected = fromIntegral (natVal (Proxy :: Proxy n)) :: Int
let len = BS.length bs
if len /= expected
then error $! "ByteStringSized: expected " ++ show expected ++ " bytes, got " ++ show len
else packByteStringM bs

unpackM = do
let expected = fromIntegral (natVal (Proxy :: Proxy n)) :: Int
bs <- unpackByteStringM expected
pure (ByteStringSized bs)

-- | Helper to store CBOR terms directly as entries.
newtype CBORTerm = CBORTerm CBOR.Term
deriving (Eq, Ord, Show)

instance MemPack CBORTerm where
packedByteCount (CBORTerm t) =
BS.length (CBOR.toStrictByteString (CBOR.encodeTerm t))

packM (CBORTerm t) =
packByteStringM (CBOR.toStrictByteString (CBOR.encodeTerm t))

unpackM = do
start <- gets fromIntegral
bytes <- consumeBytes
case CBOR.deserialiseFromBytesWithSize CBOR.decodeTerm (BSL.fromStrict bytes) of
Left err -> failUnpack $ TextError $ "CBOR term deserialisation failed: " <> T.pack (show err)
Right (_rest, bytesRead, term) -> do
put (start + fromIntegral bytesRead)
pure (CBORTerm term)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Cardano.Types.Network
import Cardano.Types.SlotNo
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.ST (runST)
import Data.Function (on)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.MemPack
Expand All @@ -30,7 +31,7 @@ import VectorBuilder.MVector qualified as Builder
At this point it accepts values from one namespace only.
-}
serialize ::
(MemPack a, Ord a, Typeable a, HasKey a, MemPackHeaderOffset a) =>
(MemPack a, Ord (Key a), Typeable a, HasKey a, MemPackHeaderOffset a) =>
-- | path to resulting file
FilePath ->
-- | Network identifier
Expand All @@ -50,7 +51,7 @@ serialize resultFilePath network slotNo plan = do
S.each [n S.:> S.each v | (n, v) <- Map.toList orderedStream]
)
where
mkVectors :: (Ord a) => S.Stream (S.Of (InputChunk a)) IO () -> IO (Map Namespace (V.Vector a))
mkVectors :: (Ord (Key a), HasKey a) => S.Stream (S.Of (InputChunk a)) IO () -> IO (Map Namespace (V.Vector a))
mkVectors = do
S.foldM_
do
Expand All @@ -61,10 +62,10 @@ serialize resultFilePath network slotNo plan = do
do
traverse \builder -> pure $ runST do
mv <- Builder.build builder
Tim.sort mv
Tim.sortBy (compare `on` getKey) mv
V.unsafeFreeze mv

mkVector :: (Ord a) => S.Stream (S.Of a) IO () -> IO (Builder.Builder a)
mkVector :: S.Stream (S.Of a) IO () -> IO (Builder.Builder a)
mkVector = S.fold_
do \x e -> x <> Builder.singleton e
do Builder.empty
Expand Down
Loading