Skip to content

Commit bbd2c4e

Browse files
committed
refactor: rename _ConsumerThread and _WorkerThread to public classes
1 parent 4236b82 commit bbd2c4e

File tree

1 file changed

+13
-13
lines changed

1 file changed

+13
-13
lines changed

dramatiq/worker.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def __init__(
8383
self.logger = get_logger(__name__, type(self))
8484
self.broker = broker
8585

86-
self.consumers: dict[str, "_ConsumerThread"] = {}
86+
self.consumers: dict[str, "ConsumerThread"] = {}
8787
self.consumer_whitelist = queues and set(queues)
8888
# Load a small factor more messages than there are workers to
8989
# avoid waiting on network IO as much as possible. The factor
@@ -93,7 +93,7 @@ def __init__(
9393
# workers as those messages could have far-future etas.
9494
self.delay_prefetch = DELAY_QUEUE_PREFETCH or min(worker_threads * 1000, 65535)
9595

96-
self.workers: list[_WorkerThread] = []
96+
self.workers: list[WorkerThread] = []
9797
self.work_queue: PriorityQueue[tuple[int, MessageProxy]] = PriorityQueue()
9898
self.worker_timeout = worker_timeout
9999
self.worker_threads = worker_threads
@@ -113,7 +113,7 @@ def start(self) -> None:
113113

114114
def pause(self) -> None:
115115
"""Pauses all the worker threads."""
116-
child: Union[_WorkerThread, _ConsumerThread]
116+
child: Union[WorkerThread, ConsumerThread]
117117

118118
for child in self.consumers.values():
119119
child.pause()
@@ -127,7 +127,7 @@ def pause(self) -> None:
127127

128128
def resume(self) -> None:
129129
"""Resumes all the worker threads."""
130-
child: Union[_WorkerThread, _ConsumerThread]
130+
child: Union[WorkerThread, ConsumerThread]
131131

132132
for child in self.consumers.values():
133133
child.resume()
@@ -145,7 +145,7 @@ def stop(self, timeout: int = 600000) -> None:
145145
self.broker.emit_before("worker_shutdown", self)
146146
self.logger.info("Shutting down...")
147147

148-
thread: Union[_WorkerThread, _ConsumerThread]
148+
thread: Union[WorkerThread, ConsumerThread]
149149

150150
# Stop workers before consumers. The consumers are kept alive
151151
# during this process so that heartbeats keep being sent to
@@ -214,7 +214,7 @@ def _add_consumer(self, queue_name: str, *, delay: bool = False) -> None:
214214
self.logger.debug("Dropping consumer for queue %r: not whitelisted.", queue_name)
215215
return
216216

217-
consumer = self.consumers[queue_name] = _ConsumerThread(
217+
consumer = self.consumers[queue_name] = ConsumerThread(
218218
broker=self.broker,
219219
queue_name=queue_name,
220220
prefetch=self.delay_prefetch if delay else self.queue_prefetch,
@@ -224,7 +224,7 @@ def _add_consumer(self, queue_name: str, *, delay: bool = False) -> None:
224224
consumer.start()
225225

226226
def _add_worker(self) -> None:
227-
worker = _WorkerThread(
227+
worker = WorkerThread(
228228
broker=self.broker,
229229
consumers=self.consumers,
230230
work_queue=self.work_queue,
@@ -248,7 +248,7 @@ def after_declare_delay_queue(self, broker: Broker, queue_name: str) -> None:
248248
self.worker._add_consumer(queue_name, delay=True)
249249

250250

251-
class _ConsumerThread(Thread):
251+
class ConsumerThread(Thread):
252252
def __init__(
253253
self,
254254
*,
@@ -454,13 +454,13 @@ def close(self) -> None:
454454
pass
455455

456456

457-
class _WorkerThread(Thread):
457+
class WorkerThread(Thread):
458458
"""WorkerThreads process incoming messages off of the work queue
459459
on a loop. By themselves, they don't do any sort of network IO.
460460
461461
Parameters:
462462
broker(Broker)
463-
consumers(dict[str, _ConsumerThread])
463+
consumers(dict[str, ConsumerThread])
464464
work_queue(Queue)
465465
worker_timeout(int)
466466
"""
@@ -469,7 +469,7 @@ def __init__(
469469
self,
470470
*,
471471
broker: Broker,
472-
consumers: dict[str, _ConsumerThread],
472+
consumers: dict[str, ConsumerThread],
473473
work_queue: PriorityQueue[tuple[int, MessageProxy]],
474474
worker_timeout: int,
475475
) -> None:
@@ -602,5 +602,5 @@ def has_results_middleware(broker: Broker) -> bool:
602602
return any(type(m) is Results for m in broker.middleware)
603603

604604

605-
ConsumerThread = _ConsumerThread
606-
WorkerThread = _WorkerThread
605+
_ConsumerThread = ConsumerThread
606+
_WorkerThread = WorkerThread

0 commit comments

Comments
 (0)