Skip to content

Commit 56feab5

Browse files
authored
Merge pull request #761 from gurelkaynak/gurel/issue_759
Fixes #759 Consumers down when ETA is None (or not isnumeric)
2 parents b488bbc + a37f286 commit 56feab5

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed

CONTRIBUTORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,4 @@ of those changes to CLEARTYPE SRL.
7272
| [@LincolnPuzey](https://github.com/LincolnPuzey) | Lincoln Puzey |
7373
| [@guedesfelipe](https://github.com/guedesfelipe) | Felipe Guedes |
7474
| [@karolinepauls](https://karolinepauls.com) | Karoline Pauls |
75+
| [@gurelkaynak](https://gurel.kaynak.link) | Gurel Kaynak |

dramatiq/worker.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,17 @@ def handle_message(self, message: MessageProxy) -> None:
343343
If the message has an eta, delay it. Otherwise, put it on the
344344
work queue.
345345
"""
346+
if "eta" in message.options:
347+
if message.options["eta"] is None:
348+
del message.options["eta"]
349+
elif not isinstance(message.options["eta"], (int, float)):
350+
self.logger.warning(
351+
"Invalid eta value for message %r: %r. Setting eta to current_millis().",
352+
message.message_id,
353+
message.options["eta"],
354+
)
355+
message.options["eta"] = current_millis()
356+
346357
try:
347358
if "eta" in message.options:
348359
self.logger.debug("Pushing message %r onto delay queue.", message.message_id)

tests/test_actors.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import logging
34
import time
45
from datetime import timedelta
56
from unittest.mock import patch
@@ -533,3 +534,51 @@ def f2():
533534
# Then a ValueError should be raised
534535
assert exc_info.type is ValueError
535536
assert str(exc_info.value) == "An actor named 'foo' is already registered."
537+
538+
539+
def test_worker_handles_non_numeric_eta_and_logs_warning(stub_broker, stub_worker, caplog):
540+
# Set the log level to capture warnings
541+
caplog.set_level(logging.WARNING)
542+
543+
# Given an actor that records when it runs
544+
run = []
545+
546+
@dramatiq.actor
547+
def record():
548+
run.append(True)
549+
550+
# If I send it a message with a non-numeric eta manually from another project or manually from rabbitmq UI (not using dramatiq)
551+
message = record.message_with_options(eta="not-a-number")
552+
stub_broker.queues[record.queue_name].put(message.encode())
553+
554+
# Then join on the queue
555+
stub_broker.join(record.queue_name)
556+
stub_worker.join()
557+
558+
# I expect the message to have been processed
559+
assert run
560+
561+
# And a warning should have been logged about the invalid eta
562+
assert any(
563+
"Invalid eta value for message" in record.message for record in caplog.records if record.levelname == "WARNING"
564+
)
565+
566+
567+
def test_worker_handles_none_eta(stub_broker, stub_worker):
568+
# Given an actor that records when it runs
569+
run = []
570+
571+
@dramatiq.actor
572+
def record():
573+
run.append(True)
574+
575+
# If I send it a message with a None eta
576+
message = record.message_with_options(eta=None)
577+
stub_broker.queues[record.queue_name].put(message.encode())
578+
579+
# Then join on the queue
580+
stub_broker.join(record.queue_name)
581+
stub_worker.join()
582+
583+
# I expect the message to have been processed
584+
assert run

0 commit comments

Comments
 (0)