Skip to content

Conversation

@rvem
Copy link
Contributor

@rvem rvem commented Dec 8, 2025

No description provided.

rvem added 2 commits December 8, 2025 11:33
Problem: 'qPop' may fail due to an exception (e.g. SQL connection-related
issues), and, as a result, the queue popper async action will stop.

Solution: Restart queue popper async action on error or finish.
@rvem rvem marked this pull request as ready for review December 8, 2025 12:05
@rvem rvem merged commit 7020023 into master Dec 8, 2025
8 checks passed
@prikhi
Copy link
Member

prikhi commented Dec 9, 2025

What's the diff between this & the package's code.

If it's useful globally, I can upstream it to the package:
https://github.com/prikhi/immortal-queue

@rvem
Copy link
Contributor Author

rvem commented Dec 10, 2025

The main difference is here:

processImmortalQueue :: forall a . ImmortalQueue a -> IO QueueId
processImmortalQueue queue = do
    shutdown   <- newEmptyMVar
    threads    <- mapM (const makeWorker) [1 .. qThreadCount queue]
    nextAction <- newEmptyMVar

    let startQueuePopper = async $ popQueue nextAction threads
        -- Wait for shutdown or popper finish, restart popper on error or unexpected finish
        manageQueuePopper = do
            asyncQueuePopper <- startQueuePopper
            finishAction <- takeMVar shutdown `race` waitCatch asyncQueuePopper
            case finishAction of
                Left cleanClose -> do
                    cancel asyncQueuePopper
                    if cleanClose
                        then do
                            mapM_ (Immortal.mortalize . wdThread) threads
                            mapM_ (flip putMVar () . wdCloseMVar) threads
                        else mapM_ (Immortal.stop . wdThread) threads
                    mapM_ (Immortal.wait . wdThread) threads
                    tryTakeMVar nextAction >>= \case
                        Nothing     -> return ()
                        Just action -> qPush queue action
                Right (Left err) -> do
                    -- Restart the popper on error
                    putStrLn $ "Error occured in immortal queue popper: " ++ show err
                    manageQueuePopper
                Right (Right _) -> do
                    putStrLn "Immortal unexpectedly finished. Restarting..."
                    manageQueuePopper

    asyncQueue <- async manageQueuePopper
    return QueueId { qiCloseCleanly = shutdown, qiAsyncQueue = asyncQueue }

Now the popper async action is restarted on finish/failure, while previously it could've failed (e.g., due to an SQL error in qPop that can occur due to a temporary network issue and whatnot), and the queue will stop processing new jobs

@rvem
Copy link
Contributor Author

rvem commented Dec 10, 2025

If it's useful globally

I believe it is. This should make the queue pop process more resilient

@prikhi
Copy link
Member

prikhi commented Dec 11, 2025

@rvem thanks, check this out - if looks good i'll merge & cut a new version:

prikhi/immortal-queue#1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants