Skip to content

Commit 9037687

Browse files
authored
Merge pull request #744 from jenstroeger/add-some-typing-broker
Further improve typing on external interface functions for Broker and Worker
2 parents ab96275 + 37dd13b commit 9037687

File tree

2 files changed

+19
-19
lines changed

2 files changed

+19
-19
lines changed

dramatiq/broker.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class Broker:
9393
overwrite when they are declared.
9494
"""
9595

96-
def __init__(self, middleware: Optional[list[Middleware]] = None):
96+
def __init__(self, middleware: Optional[list[Middleware]] = None) -> None:
9797
self.logger = get_logger(__name__, type(self))
9898
self.actors: dict[str, Actor] = {}
9999
self.queues: Any = {} # Subclasses make this a set!
@@ -132,7 +132,7 @@ def add_middleware(
132132
*,
133133
before: Optional[type[Middleware]] = None,
134134
after: Optional[type[Middleware]] = None,
135-
):
135+
) -> None:
136136
"""Add a middleware object to this broker. The middleware is
137137
appended to the end of the middleware list by default.
138138
@@ -229,7 +229,7 @@ def declare_queue(self, queue_name: str) -> None: # pragma: no cover
229229
"""
230230
raise NotImplementedError
231231

232-
def enqueue(self, message, *, delay: Optional[int] = None): # pragma: no cover
232+
def enqueue(self, message: Message, *, delay: Optional[int] = None) -> Message: # pragma: no cover
233233
"""Enqueue a message on this broker.
234234
235235
Parameters:
@@ -241,7 +241,7 @@ def enqueue(self, message, *, delay: Optional[int] = None): # pragma: no cover
241241
"""
242242
raise NotImplementedError
243243

244-
def get_actor(self, actor_name: str): # pragma: no cover
244+
def get_actor(self, actor_name: str) -> Actor: # pragma: no cover
245245
"""Look up an actor by its name.
246246
247247
Parameters:
@@ -337,7 +337,7 @@ def __iter__(self) -> Self: # pragma: no cover
337337
"""Returns this instance as a Message iterator."""
338338
return self
339339

340-
def ack(self, message: MessageProxy): # pragma: no cover
340+
def ack(self, message: MessageProxy) -> None: # pragma: no cover
341341
"""Acknowledge that a message has been processed, removing it
342342
from the broker.
343343
@@ -346,15 +346,15 @@ def ack(self, message: MessageProxy): # pragma: no cover
346346
"""
347347
raise NotImplementedError
348348

349-
def nack(self, message: MessageProxy): # pragma: no cover
349+
def nack(self, message: MessageProxy) -> None: # pragma: no cover
350350
"""Move a message to the dead-letter queue.
351351
352352
Parameters:
353353
message(MessageProxy): The message to reject.
354354
"""
355355
raise NotImplementedError
356356

357-
def requeue(self, messages: Iterable[MessageProxy]): # pragma: no cover
357+
def requeue(self, messages: Iterable[MessageProxy]) -> None: # pragma: no cover
358358
"""Move unacked messages back to their queues. This is called
359359
by consumer threads when they fail or are shut down. The
360360
default implementation does nothing.
@@ -363,7 +363,7 @@ def requeue(self, messages: Iterable[MessageProxy]): # pragma: no cover
363363
messages(Iterable[MessageProxy]): The messages to requeue.
364364
"""
365365

366-
def __next__(self): # pragma: no cover
366+
def __next__(self) -> MessageProxy: # pragma: no cover
367367
"""Retrieve the next message off of the queue. This method
368368
blocks until a message becomes available.
369369
@@ -393,7 +393,7 @@ class MessageProxy:
393393
message_id: str
394394
message_timestamp: int
395395

396-
def __init__(self, message: Message):
396+
def __init__(self, message: Message) -> None:
397397
self.failed = False
398398
self._message = message
399399
self._exception: Optional[BaseException] = None

dramatiq/worker.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def resume(self) -> None:
134134
for child in self.workers:
135135
child.resume()
136136

137-
def stop(self, timeout: int = 600000):
137+
def stop(self, timeout: int = 600000) -> None:
138138
"""Gracefully stop the Worker and all of its consumers and
139139
workers.
140140
@@ -235,15 +235,15 @@ def _add_worker(self) -> None:
235235

236236

237237
class _WorkerMiddleware(Middleware):
238-
def __init__(self, worker: Worker):
238+
def __init__(self, worker: Worker) -> None:
239239
self.logger = get_logger(__name__, type(self))
240240
self.worker = worker
241241

242-
def after_declare_queue(self, broker: Broker, queue_name: str):
242+
def after_declare_queue(self, broker: Broker, queue_name: str) -> None:
243243
self.logger.debug("Adding consumer for queue %r.", queue_name)
244244
self.worker._add_consumer(queue_name)
245245

246-
def after_declare_delay_queue(self, broker: Broker, queue_name: str):
246+
def after_declare_delay_queue(self, broker: Broker, queue_name: str) -> None:
247247
self.logger.debug("Adding consumer for delay queue %r.", queue_name)
248248
self.worker._add_consumer(queue_name, delay=True)
249249

@@ -257,7 +257,7 @@ def __init__(
257257
prefetch: int,
258258
work_queue: PriorityQueue[tuple[int, MessageProxy]],
259259
worker_timeout: int,
260-
):
260+
) -> None:
261261
super().__init__(daemon=True)
262262

263263
self.logger = get_logger(__name__, "ConsumerThread(%s)" % queue_name)
@@ -338,7 +338,7 @@ def handle_delayed_messages(self) -> None:
338338
self.post_process_message(message)
339339
self.delay_queue.task_done()
340340

341-
def handle_message(self, message: MessageProxy):
341+
def handle_message(self, message: MessageProxy) -> None:
342342
"""Handle a message received off of the underlying consumer.
343343
If the message has an eta, delay it. Otherwise, put it on the
344344
work queue.
@@ -362,7 +362,7 @@ def handle_message(self, message: MessageProxy):
362362
message.fail()
363363
self.post_process_message(message)
364364

365-
def post_process_message(self, message: MessageProxy):
365+
def post_process_message(self, message: MessageProxy) -> None:
366366
"""Called by worker threads whenever they're done processing
367367
individual messages, signaling that each message is ready to
368368
be acked or rejected.
@@ -416,7 +416,7 @@ def post_process_message(self, message: MessageProxy):
416416

417417
return
418418

419-
def requeue_messages(self, messages: Iterable[MessageProxy]):
419+
def requeue_messages(self, messages: Iterable[MessageProxy]) -> None:
420420
"""Called on worker shutdown and whenever there is a
421421
connection error to move unacked messages back to their
422422
respective queues asap.
@@ -472,7 +472,7 @@ def __init__(
472472
consumers: dict[str, _ConsumerThread],
473473
work_queue: PriorityQueue[tuple[int, MessageProxy]],
474474
worker_timeout: int,
475-
):
475+
) -> None:
476476
super().__init__(daemon=True)
477477

478478
self.logger = get_logger(__name__, "WorkerThread")
@@ -504,7 +504,7 @@ def run(self) -> None:
504504
self.broker.emit_before("worker_thread_shutdown", self)
505505
self.logger.debug("Worker thread stopped.")
506506

507-
def process_message(self, message: MessageProxy):
507+
def process_message(self, message: MessageProxy) -> None:
508508
"""Process a message pulled off of the work queue then push it
509509
back to its associated consumer for post processing. Stuff any SkipMessage
510510
exception or BaseException into the message [proxy] so that it may be used

0 commit comments

Comments
 (0)