Skip to content

fix(admin): make creation unsafe and wait for response #206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions .github/workflows/haskell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
steps:
- uses: actions/checkout@v4

- uses: actions/cache@v2
- uses: actions/cache@v3
name: Cache librdkafka
with:
path: .librdkafka
Expand All @@ -32,7 +32,7 @@ jobs:
- name: Build librdkafka
run: ./scripts/build-librdkafka

- uses: haskell/actions/setup@v1
- uses: haskell-actions/setup@v2
id: setup-haskell
with:
ghc-version: ${{ matrix.ghc }}
Expand All @@ -45,10 +45,15 @@ jobs:
- name: Configure project
run: cabal configure --enable-tests --enable-benchmarks --write-ghc-environment-files=ghc8.4.4+

- uses: action-works/cabal-cache@v1
name: Cache cabal store
- name: Cache ~/.cabal/packages, ~/.cabal/store and dist-newstyle
uses: actions/cache@v4
with:
key-prefix: CwBTpnRd
path: |
~/.cabal/packages
~/.cabal/store
dist-newstyle
key: ${{ runner.os }}-${{ matrix.ghc }}-${{ hashFiles('**/*.cabal', '**/cabal.project', '**/cabal.project.freeze') }}
restore-keys: ${{ runner.os }}-${{ matrix.ghc }}-

- name: Build
# Try building it twice in case of flakey builds on Windows
Expand Down Expand Up @@ -98,7 +103,8 @@ jobs:
if git push origin "v$package_version"; then
echo "Tagged with new version "v$package_version""

echo "::set-output name=tag::v$package_version"
echo "tag=v$package_version" >> $GITHUB_OUTPUT

fi
fi

Expand Down
75 changes: 73 additions & 2 deletions src/Kafka/Internal/RdKafka.chs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,8 @@ rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort
-- Topics
{#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #}


data RdKafkaTopicResultT
{#pointer *rd_kafka_topic_result_t as RdKafkaTopicResultTPtr foreign -> RdKafkaTopicResultT#}

data RdKafkaAdminOptionsT
{#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #}
Expand All @@ -1180,6 +1181,9 @@ data RdKafkaNewTopicT
foreign import ccall unsafe "rdkafka.h &rd_kafka_AdminOptions_destroy" -- prevent memory leak
finalRdKafkaAdminOptionsDestroy :: FinalizerPtr RdKafkaAdminOptionsT

{#fun rd_kafka_NewTopic_set_config as ^
{`RdKafkaNewTopicTPtr', `String', `String'} -> `Either RdKafkaRespErrT ()' cIntToRespEither #}

newRdKAdminOptions :: RdKafkaTPtr -> RdKafkaAdminOpT -> IO RdKafkaAdminOptionsTPtr
newRdKAdminOptions kafkaPtr opt = do
res <- rdKafkaAdminOptionsNew kafkaPtr opt
Expand All @@ -1193,6 +1197,9 @@ rdKafkaNewTopicDestroy tPtr = do
foreign import ccall "&rd_kafka_NewTopic_destroy"
rdKafkaNewTopicDestroyFinalizer :: FinalizerPtr RdKafkaNewTopicT

data RdKafkaCreateTopicsResultT
{#pointer *rd_kafka_CreateTopics_result_t as RdKafkaCreateTopicsResultTPtr foreign -> RdKafkaCreateTopicsResultT #}

newRdKafkaNewTopic :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr)
newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do
allocaBytes nErrorBytes $ \ptr -> do
Expand All @@ -1202,6 +1209,46 @@ newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do
then peekCString ptr >>= pure . Left
else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res)

newRdKafkaNewTopicUnsafe :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr)
newRdKafkaNewTopicUnsafe topicName topicPartition topicReplicationFactor = do
allocaBytes nErrorBytes $ \ptr -> do
res <- rdKafkaNewTopicNew topicName topicPartition topicReplicationFactor ptr (fromIntegral nErrorBytes)
withForeignPtr res $ \realPtr -> do
if realPtr == nullPtr
then peekCString ptr >>= pure . Left
else pure (Right res)

rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaCreateTopicsResultTPtr)
rdKafkaEventCreateTopicsResult evtPtr =
withForeignPtr evtPtr $ \evtPtr' -> do
res <- {#call rd_kafka_event_CreateTopics_result#} (castPtr evtPtr')
if (res == nullPtr)
then pure Nothing
else Just <$> newForeignPtr_ (castPtr res)

rdKafkaCreateTopicsResultTopics :: RdKafkaCreateTopicsResultTPtr
-> IO [Either (String, RdKafkaRespErrT, String) String]
rdKafkaCreateTopicsResultTopics tRes =
withForeignPtr tRes $ \tRes' ->
alloca $ \sPtr -> do
res <- {#call rd_kafka_CreateTopics_result_topics#} (castPtr tRes') sPtr
size <- peekIntConv sPtr
arr <- peekArray size res
traverse unpackRdKafkaTopicResult arr

-- | Unpacks raw result into
-- 'Either (topicName, errorType, errorMsg) topicName'
unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT
-> IO (Either (String, RdKafkaRespErrT, String) String)
unpackRdKafkaTopicResult resPtr = do
name <- {#call rd_kafka_topic_result_name#} resPtr >>= peekCString
err <- {#call rd_kafka_topic_result_error#} resPtr
case cIntToEnum err of
RdKafkaRespErrNoError -> pure $ Right name
respErr -> do
errMsg <- {#call rd_kafka_topic_result_error_string#} resPtr >>= peekCString
pure $ Left (name, respErr, errMsg)

--- Create topic
rdKafkaCreateTopic :: RdKafkaTPtr
-> RdKafkaNewTopicTPtr
Expand All @@ -1222,7 +1269,7 @@ data RdKafkaDeleteTopicT
{#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #}

data RdKafkaDeleteTopicsResultT
{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #}
{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicsResultTPtr foreign -> RdKafkaDeleteTopicsResultT #}

newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr)
newRdKafkaDeleteTopic topicNameStr =
Expand All @@ -1232,6 +1279,14 @@ newRdKafkaDeleteTopic topicNameStr =
then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr
else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res

rdKafkaEventDeleteTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaDeleteTopicsResultTPtr)
rdKafkaEventDeleteTopicsResult evtPtr =
withForeignPtr evtPtr $ \evtPtr' -> do
res <- {#call rd_kafka_event_DeleteTopics_result#} (castPtr evtPtr')
if (res == nullPtr)
then pure Nothing
else Just <$> newForeignPtr_ (castPtr res)

rdKafkaDeleteTopics :: RdKafkaTPtr
-> [RdKafkaDeleteTopicTPtr]
-> RdKafkaAdminOptionsTPtr
Expand All @@ -1242,6 +1297,16 @@ rdKafkaDeleteTopics kafkaPtr topics opts queue = do
withForeignPtrsArrayLen topics $ \tLen tPtr -> do
{#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr

rdKafkaDeleteTopicsResultTopics :: RdKafkaDeleteTopicsResultTPtr
-> IO [Either (String, RdKafkaRespErrT, String) String]
rdKafkaDeleteTopicsResultTopics tRes =
withForeignPtr tRes $ \tRes' ->
alloca $ \sPtr -> do
res <- {#call rd_kafka_DeleteTopics_result_topics#} (castPtr tRes') sPtr
size <- peekIntConv sPtr
arr <- peekArray size res
traverse unpackRdKafkaTopicResult arr

-- Marshall / Unmarshall
enumToCInt :: Enum a => a -> CInt
enumToCInt = fromIntegral . fromEnum
Expand All @@ -1255,6 +1320,12 @@ cIntConv :: (Integral a, Num b) => a -> b
cIntConv = fromIntegral
{-# INLINE cIntConv #-}

cIntToRespEither err =
case cIntToEnum err of
RdKafkaRespErrNoError -> Right ()
respErr -> Left respErr
{-# INLINE cIntToRespEither #-}

boolToCInt :: Bool -> CInt
boolToCInt True = CInt 1
boolToCInt False = CInt 0
Expand Down
115 changes: 83 additions & 32 deletions src/Kafka/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@ module X
, deleteTopic
) where

import Control.Exception
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe
import Data.Bifunctor
import Data.Foldable
import Data.List.NonEmpty
import qualified Data.List.NonEmpty as NEL
import Data.Text
import qualified Data.Text as T

import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import qualified Data.List.NonEmpty as NEL
import qualified Data.Map as M
import Data.Maybe
import qualified Data.Set as S
import qualified Data.Text as T
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup

import Kafka.Topic.Types as X
import Kafka.Types as X
Expand All @@ -24,11 +31,17 @@ createTopic k topic = do
opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny

topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue

case topicRes of
Left err -> do
pure $ Left (NEL.head err)
Right _ -> do
pure $ Right $ topicName topic
res <- waitForResponse (topicName topic) rdKafkaEventCreateTopicsResult rdKafkaCreateTopicsResultTopics queue
case listToMaybe res of
Nothing -> pure $ Left KafkaInvalidReturnValue
Just result -> pure $ case result of
Left (_, e, _) -> Left e
Right tName -> Right tName

--- DELETE TOPIC ---
deleteTopic :: HasKafka k
Expand All @@ -45,19 +58,17 @@ deleteTopic k topic = liftIO $ do
Left err -> do
pure $ Left (NEL.head err)
Right _ -> do
pure $ Right topic
res <- waitForResponse topic rdKafkaEventDeleteTopicsResult rdKafkaDeleteTopicsResultTopics queue
case listToMaybe res of
Nothing -> pure $ Left KafkaInvalidReturnValue
Just result -> pure $ case result of
Left (_, e, _) -> Left e
Right tName -> Right tName

withNewTopic :: NewTopic
-> (RdKafkaNewTopicTPtr -> IO a)
-> IO (Either (NonEmpty KafkaError) a)
withNewTopic t transform = do
mkNewTopicRes <- mkNewTopic t newTopicPtr
case mkNewTopicRes of
Left err -> do
return $ Left err
Right topic -> do
res <- transform topic
return $ Right res
withNewTopic t = withUnsafeOne t mkNewTopicUnsafe rdKafkaNewTopicDestroy

withOldTopic :: TopicName
-> (RdKafkaDeleteTopicTPtr -> IO a)
Expand All @@ -71,28 +82,21 @@ withOldTopic tName transform = do
res <- transform topic
return $ Right res

newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
newTopicPtr topic = do
ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
case ptrRes of
Left str -> pure $ Left (KafkaError $ T.pack str)
Right ptr -> pure $ Right ptr

oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr)
oldTopicPtr tName = do
res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName
res <- newRdKafkaDeleteTopic $ T.unpack . unTopicName $ tName
case res of
Left str -> pure $ Left (KafkaError $ T.pack str)
Right ptr -> pure $ Right ptr

mkNewTopic :: NewTopic
-> (NewTopic -> IO (Either KafkaError a))
-> IO (Either (NonEmpty KafkaError) a)
mkNewTopic topic create = do
res <- create topic
case res of
Left err -> pure $ Left (singletonList err)
Right resource -> pure $ Right resource
mkNewTopicUnsafe :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
mkNewTopicUnsafe topic = runExceptT $ do
topic' <- withErrStr $ newRdKafkaNewTopicUnsafe (T.unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic)
_ <- withErrKafka $ whileRight (uncurry $ rdKafkaNewTopicSetConfig undefined) (M.toList $ topicConfig topic)
pure topic'
where
withErrStr = withExceptT (KafkaError . T.pack) . ExceptT
withErrKafka = withExceptT KafkaResponseError . ExceptT

rmOldTopic :: TopicName
-> (TopicName -> IO (Either KafkaError a))
Expand All @@ -103,5 +107,52 @@ rmOldTopic tName remove = do
Left err -> pure $ Left (singletonList err)
Right resource -> pure $ Right resource

withUnsafeOne :: a -- ^ Item to handle
-> (a -> IO (Either KafkaError b)) -- ^ Create an unsafe element
-> (b -> IO ()) -- ^ Destroy the unsafe element
-> (b -> IO c) -- ^ Handler
-> IO (Either (NonEmpty KafkaError) c)
withUnsafeOne a mkOne cleanup f =
bracket (mkOne a) cleanupOne processOne
where
cleanupOne (Right b) = cleanup b
cleanupOne (Left _) = pure () -- no resource to clean if creation failed

processOne (Right b) = Right <$> f b
processOne (Left e) = pure (Left (singletonList e))

whileRight :: Monad m
=> (a -> m (Either e ()))
-> [a]
-> m (Either e ())
whileRight f as = runExceptT $ traverse_ (ExceptT . f) as

waitForResponse :: TopicName
-> (RdKafkaEventTPtr -> IO (Maybe a))
-> (a -> IO [Either (String, RdKafkaRespErrT, String) String])
-> RdKafkaQueueTPtr
-> IO [Either (TopicName, KafkaError, String) TopicName]
waitForResponse topic fromEvent toResults q =
fromMaybe [] <$> runMaybeT (go [])
where
awaited = S.singleton topic

go accRes = do
qRes <- MaybeT $ rdKafkaQueuePoll q 1000
eRes <- MaybeT $ fromEvent qRes
tRes <- lift $ toResults eRes
let results = wrapTopicName <$> tRes
let topics = S.fromList $ getTopicName <$> results
let newRes = results <> accRes
let remaining = S.difference awaited topics
if S.null remaining
then pure newRes
else go newRes

getTopicName = either (\(t,_,_) -> t) id
wrapTopicName = bimap (\(t,e,s) -> (TopicName (T.pack t), KafkaResponseError e, s))
(TopicName . T.pack)

singletonList :: a -> NonEmpty a
singletonList x = x :| []

Loading