Skip to content

Commit 9c6abb1

Browse files
committed
Fix #631 -- Add task_default_queue fallback if task_queues is None
By default Celeries' `task_queues` is None. Even though, there is a default queue set unless the configuration is altered. * Drop Celery 3.0 support and drop the old settings fallback. * Harden test suite by relying less on mocking.
1 parent 6aba95c commit 9c6abb1

File tree

3 files changed

+96
-45
lines changed

3 files changed

+96
-45
lines changed

health_check/contrib/celery.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import dataclasses
44
import datetime
55
import typing
6+
from types import MappingProxyType
67

78
import celery
89
from celery.app import app_or_default
@@ -25,7 +26,9 @@ class Ping(HealthCheck):
2526
2627
"""
2728

28-
CORRECT_PING_RESPONSE: typing.ClassVar[dict[str, str]] = {"ok": "pong"}
29+
CORRECT_PING_RESPONSE: typing.Final[dict[str, str]] = MappingProxyType(
30+
{"ok": "pong"}
31+
)
2932
app: celery.Celery = dataclasses.field(default_factory=app_or_default)
3033
timeout: datetime.timedelta = dataclasses.field(
3134
default=datetime.timedelta(seconds=1), repr=False
@@ -59,11 +62,11 @@ def active_workers(self, ping_result):
5962
yield worker
6063

6164
def check_active_queues(self, *active_workers):
62-
defined_queues = {
63-
queue.name
64-
for queue in getattr(self.app.conf, "task_queues", None)
65-
or getattr(self.app.conf, "CELERY_QUEUES", None)
66-
}
65+
try:
66+
defined_queues = {queue.name for queue in self.app.conf.task_queues}
67+
except TypeError:
68+
# conf.task_queues may be None
69+
defined_queues = {self.app.conf.task_default_queue}
6770
active_queues = {
6871
queue.get("name")
6972
for queues in self.app.control.inspect(active_workers)

tests/contrib/test_celery.py

Lines changed: 86 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,30 @@
11
"""Tests for Celery health check."""
22

3+
import datetime
4+
import logging
35
from unittest import mock
46

57
import pytest
68

79
pytest.importorskip("celery")
810

11+
from kombu import Queue
12+
913
from health_check.contrib.celery import Ping as CeleryPingHealthCheck
1014
from health_check.exceptions import ServiceUnavailable
15+
from tests.testapp.celery import app as celery_app
16+
17+
logger = logging.getLogger(__name__)
1118

1219

1320
class TestCelery:
1421
"""Test Celery ping health check."""
1522

16-
@pytest.mark.asyncio
17-
async def test_check_status__success(self):
18-
"""Report healthy when workers respond correctly."""
19-
app = mock.MagicMock()
20-
app.ping.return_value = [{"celery@worker1": {"ok": "pong"}}]
21-
check = CeleryPingHealthCheck()
22-
check.app = app
23-
24-
result = await check.get_result()
25-
assert result.error is None
26-
27-
@pytest.mark.asyncio
28-
async def test_check_status__no_workers(self):
29-
"""Raise ServiceUnavailable when no workers respond."""
30-
mock_app = mock.MagicMock()
31-
mock_app.control.ping.return_value = {}
32-
check = CeleryPingHealthCheck()
33-
check.app = mock_app
34-
35-
result = await check.get_result()
36-
assert result.error is not None
37-
assert isinstance(result.error, ServiceUnavailable)
38-
assert "unavailable" in str(result.error).lower()
39-
4023
@pytest.mark.asyncio
4124
async def test_check_status__unexpected_response(self):
4225
"""Raise ServiceUnavailable when worker response is incorrect."""
43-
mock_result = {"celery@worker1": {"bad": "response"}}
4426
mock_app = mock.MagicMock()
45-
mock_app.control.ping.return_value = [mock_result]
27+
mock_app.control.ping.return_value = [{"celery@worker1": {"bad": "response"}}]
4628
check = CeleryPingHealthCheck()
4729
check.app = mock_app
4830

@@ -87,24 +69,92 @@ async def test_check_status__unknown_error(self):
8769
assert result.error is not None
8870
assert isinstance(result.error, ServiceUnavailable)
8971

72+
@pytest.mark.asyncio
73+
async def test_check_status__success(self):
74+
"""Report healthy when using real Celery app with configured queues."""
75+
default_queue = Queue("default", routing_key="default")
76+
celery_app.conf.task_queues = [default_queue]
77+
celery_app.conf.task_default_queue = "default"
78+
79+
check = CeleryPingHealthCheck(app=celery_app)
80+
81+
# Mock the control methods directly on the app before get_result runs
82+
mock_ping = mock.MagicMock(return_value=[{"celery@worker1": {"ok": "pong"}}])
83+
mock_inspect_obj = mock.MagicMock()
84+
mock_inspect_obj.active_queues.return_value = {
85+
"celery@worker1": [{"name": "default"}]
86+
}
87+
mock_inspect = mock.MagicMock(return_value=mock_inspect_obj)
88+
89+
original_ping = check.app.control.ping
90+
original_inspect = check.app.control.inspect
91+
92+
try:
93+
check.app.control.ping = mock_ping
94+
check.app.control.inspect = mock_inspect
95+
96+
result = await check.get_result()
97+
assert result.error is None
98+
finally:
99+
check.app.control.ping = original_ping
100+
check.app.control.inspect = original_inspect
101+
102+
@pytest.mark.asyncio
103+
async def test_check_status__no_workers(self):
104+
"""Raise ServiceUnavailable when real app receives no worker response."""
105+
default_queue = Queue("default", routing_key="default")
106+
celery_app.conf.task_queues = [default_queue]
107+
celery_app.conf.task_default_queue = "default"
108+
109+
check = CeleryPingHealthCheck(
110+
app=celery_app,
111+
timeout=datetime.timedelta(milliseconds=100),
112+
)
113+
result = await check.get_result()
114+
115+
assert result.error is not None
116+
assert isinstance(result.error, ServiceUnavailable)
117+
assert "unavailable" in str(result.error).lower()
118+
90119
@pytest.mark.asyncio
91120
async def test_check_status__missing_queue_worker(self):
92-
"""Raise ServiceUnavailable when a defined queue has no active workers."""
93-
mock_result = {"celery@worker1": {"ok": "pong"}}
121+
"""Verify queue validation with real Celery app configuration."""
122+
multiple_queues = [
123+
Queue("default", routing_key="default"),
124+
Queue("integration_queue", routing_key="integration_queue"),
125+
]
126+
celery_app.conf.task_queues = multiple_queues
127+
celery_app.conf.task_default_queue = "default"
128+
129+
ping_response = [{"celery@worker1": {"ok": "pong"}}]
130+
inspect_response = {"celery@worker1": [{"name": "default"}]}
131+
132+
check = CeleryPingHealthCheck(app=celery_app)
133+
check.app.control.ping = lambda **kwargs: ping_response
134+
check.app.control.inspect = lambda *args: mock.MagicMock(
135+
active_queues=lambda: inspect_response
136+
)
137+
138+
result = await check.get_result()
139+
assert result.error is not None
140+
assert isinstance(result.error, ServiceUnavailable)
141+
assert "integration_queue" in str(result.error)
142+
143+
@pytest.mark.asyncio
144+
async def test_check_status__raise_type_error__default_queue(self):
145+
"""Verify that default queue is checked when task_queues is None."""
94146
mock_app = mock.MagicMock()
95-
mock_app.control.ping.return_value = [mock_result]
96-
mock_queue = mock.MagicMock()
97-
mock_queue.name = "missing_queue"
98-
mock_app.conf.task_queues = [mock_queue]
147+
mock_app.conf.task_queues = None
148+
mock_app.conf.task_default_queue = "default"
149+
mock_app.control.ping.return_value = [{"celery@worker1": {"ok": "pong"}}]
99150
mock_inspect = mock.MagicMock()
100151
mock_inspect.active_queues.return_value = {
101-
"celery@worker1": [{"name": "celery"}]
152+
"celery@worker1": [{"name": "default"}]
102153
}
103154
mock_app.control.inspect.return_value = mock_inspect
155+
104156
check = CeleryPingHealthCheck()
105157
check.app = mock_app
106158

107159
result = await check.get_result()
108-
assert result.error is not None
109-
assert isinstance(result.error, ServiceUnavailable)
110-
assert "missing_queue" in str(result.error)
160+
assert result.error is None

tests/testapp/settings.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,12 @@
5252

5353
USE_TZ = True
5454

55-
CELERY_QUEUES = []
56-
5755
try:
5856
from kombu import Queue
5957
except ImportError:
6058
pass
6159
else:
62-
CELERY_QUEUES += [
60+
CELERY_QUEUES = [
6361
Queue("default"),
6462
Queue("queue2"),
6563
]

0 commit comments

Comments
 (0)