Skip to content

Message Re-queued without worker failure or code error during long running task #692

@santiago-roig

Description

@santiago-roig

OS: Linux
Broker: Redis

Redis Version: 6.0.8

Hello I'm having some issues with a CPU intensive task that gets re-enqueued without any worker shutdowns or errors. Basically I'm running a gurobi job in my actor which get sent to a remote server to be optimized. The actors work really well in 99% of my cases whether it's heavy CPU usage or long running calculations and even when it's waiting for the gurobi job to solve. However, when the gurobi job is in the gurobi server queue and the actor is waiting for it to start optimizing it keeps re-queuing the message causing duplicate tasks running at the same time.

My first thought was that the gurobi library is written in C++ so it blocks the actor and leaves the heartbeat unable to send so I implemented a separate thread to send the heartbeat which is looks like the following:

def _redis_heartbeat(terminate_heartbeat_flag, interval=15):
    while not terminate_heartbeat_flag.value:
        try:
            broker = dramatiq.get_broker()
            # Access the current message
            current_message = CurrentMessage.get_current_message()
            # Get the queue name from the message
            queue_name = current_message.queue_name
            if not broker:
                logger.warning("No broker available")
                time.sleep(interval)
                continue

            if not isinstance(broker, RedisBroker):
                logger.warning(f"Broker is {type(broker)}, not RedisBroker")
                return

            # Minimal heartbeat operation
            broker.do_qsize(queue_name)
            logger.info(
                f"Heartbeat {datetime.datetime.utcnow()} for broker {broker.broker_id} with timeout {broker.heartbeat_timeout}"
            )
        except Exception as e:
            logger.error(f"Heartbeat failed: {e}")
        finally:
            time.sleep(interval)


def long_running_task():
    terminate_hearbeat_flag = Value("b", False)
    heartbeat_job = multiprocessing.context.Process(
        target=_redis_heartbeat, args=(terminate_hearbeat_flag,)
    )
    heartbeat_job.daemon = True
    heartbeat_job.start()
    # Long running task runs after this

I anticipated this would fix the problem, but it remains despite me seeing the heartbeat logs and getting an up to date time when I check the key in redis. PS: I call do_qsize in order to trigger the following block of code in the dispatch.lua.

local acks = namespace .. ":__acks__." .. worker_id
local heartbeats = namespace .. ":__heartbeats__"
redis.call("zadd", heartbeats, timestamp, worker_id)

I ran the code in verbose mode to try and see what was happening, but the only extra info I get is from this log "Pushing message %r onto work queue." in handle_message.

A few notes that may help. I have a long time_limit for the actor of 24 hours since some tasks can run for incredibly long. I also increased the heartbeat_timeout to 300000 in the RedisBroker Constructor. Last I have dramatiq_queue_prefetch=1 since I only want one job to execute at a time per worker/process.

I'm a bit stumped as to why the Redis Consumer keeps trying to re-queue this job, but only during the situation where the actor is waiting for a 3rd party system to start its' optimization process.

If anyone has any guidance it would be appreciated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions