Skip to content

Commit c786fc6

Browse files
fix(taskworker): Improve Queue Size Metrics (#612)
1 parent 0ebacc2 commit c786fc6

1 file changed

Lines changed: 28 additions & 6 deletions

File tree

  • clients/python/src/taskbroker_client/worker

clients/python/src/taskbroker_client/worker/worker.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ def __init__(
127127
result_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE,
128128
rebalance_after: int = DEFAULT_REBALANCE_AFTER,
129129
processing_pool_name: str | None = None,
130+
pod_name: str | None = None,
130131
process_type: str = "spawn",
131132
health_check_file_path: str | None = None,
132133
health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH,
@@ -152,6 +153,7 @@ def __init__(
152153
child_tasks_queue_maxsize=child_tasks_queue_maxsize,
153154
result_queue_maxsize=result_queue_maxsize,
154155
processing_pool_name=processing_pool_name,
156+
pod_name=pod_name,
155157
process_type=process_type,
156158
)
157159

@@ -512,10 +514,12 @@ def __init__(
512514
child_tasks_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE,
513515
result_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE,
514516
processing_pool_name: str | None = None,
517+
pod_name: str | None = None,
515518
process_type: str = "spawn",
516519
) -> None:
517520
self._concurrency = concurrency
518521
self._processing_pool_name = processing_pool_name or "unknown"
522+
self._pod_name = pod_name or "unknown"
519523
self._send_result = send_result_fn
520524
self._max_child_task_count = max_child_task_count
521525
self._app_module = app_module
@@ -571,6 +575,30 @@ def result_thread() -> None:
571575
iopool = ThreadPoolExecutor(max_workers=self._concurrency)
572576
with iopool as executor:
573577
while not self._shutdown_event.is_set():
578+
tags = {
579+
"processing_pool": self._processing_pool_name,
580+
"pod_name": self._pod_name,
581+
}
582+
583+
try:
584+
# 'qsize' is not implemented on all platforms, such as macOS
585+
self._metrics.gauge(
586+
"taskworker.child_tasks.size",
587+
float(self._child_tasks.qsize()),
588+
tags=tags,
589+
)
590+
591+
self._metrics.gauge(
592+
"taskworker.processed_tasks.size",
593+
float(self._processed_tasks.qsize()),
594+
tags=tags,
595+
)
596+
except Exception as e:
597+
logger.debug(
598+
"taskworker.worker.queue_gauges.error",
599+
extra={"error": e, "processing_pool": self._processing_pool_name},
600+
)
601+
574602
try:
575603
result = self._processed_tasks.get(timeout=1.0)
576604
executor.submit(self.send_result, result, False)
@@ -632,12 +660,6 @@ def push_task(self, inflight: InflightTaskActivation, timeout: float | None = No
632660
set (e.g. 5.0), waits at most that many seconds and returns `False` if the
633661
queue is still full (worker busy).
634662
"""
635-
try:
636-
self._metrics.gauge("taskworker.child_tasks.size", self._child_tasks.qsize())
637-
except Exception as e:
638-
# 'qsize' does not work on macOS
639-
logger.debug("taskworker.child_tasks.size.error", extra={"error": e})
640-
641663
start_time = time.monotonic()
642664
try:
643665
self._child_tasks.put(inflight, timeout=timeout)

0 commit comments

Comments
 (0)