Skip to content

Commit 3bc7c04

Browse files
Update/dockerflow 2024.04.2, separate queue checks (#974)
* Updating to dockerflow 2024.04.2 * Making queue checks async, separating queue checks Co-authored-by: Mathieu Leplatre <[email protected]>
1 parent ceb736d commit 3bc7c04

File tree

6 files changed

+71
-35
lines changed

6 files changed

+71
-35
lines changed

jbi/app.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
9191
name="jira.all_project_issue_types_exist",
9292
)
9393
checks.register(jira_service.check_jira_pandoc_install, name="jira.pandoc_install")
94-
checks.register(queue.ready, name="queue.ready")
94+
checks.register(queue.check_writable, name="queue.writable")
95+
checks.register(queue.check_readable, name="queue.readable")
9596

9697
yield
9798

jbi/queue.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222
item and parse it as an item
2323
"""
2424

25-
import asyncio
2625
import logging
26+
import re
2727
import tempfile
2828
import traceback
2929
from abc import ABC, abstractmethod
3030
from datetime import datetime
3131
from functools import lru_cache
3232
from json import JSONDecodeError
3333
from pathlib import Path
34-
from typing import Any, AsyncIterator, Optional
34+
from typing import AsyncIterator, Optional
3535
from urllib.parse import ParseResult, urlparse
3636

3737
import dockerflow.checks
@@ -43,12 +43,13 @@
4343
logger = logging.getLogger(__name__)
4444

4545

46-
async def async_iter(iter: AsyncIterator[Any]) -> list[Any]:
47-
return [item async for item in iter]
48-
49-
5046
class QueueItemRetrievalError(Exception):
51-
pass
47+
def __init__(self, message=None, path=None):
48+
self.message = message or "Error reading or parsing queue item"
49+
self.path = path
50+
51+
def __str__(self):
52+
return f"QueueItemRetrievalError: {self.message} - path: {self.path}."
5253

5354

5455
class InvalidQueueDSNError(Exception):
@@ -199,13 +200,15 @@ async def get(self, bug_id: int) -> AsyncIterator[QueueItem]:
199200
yield QueueItem.parse_file(path)
200201
except (JSONDecodeError, ValidationError) as e:
201202
raise QueueItemRetrievalError(
202-
f"Unable to load item at path {path} from queue"
203+
"Unable to load item from queue", path=path
203204
) from e
204205

205206
async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]:
206207
all_items: dict[int, AsyncIterator[QueueItem]] = {}
207208
for filesystem_object in self.location.iterdir():
208-
if filesystem_object.is_dir():
209+
if filesystem_object.is_dir() and re.match(
210+
"\d", filesystem_object.name
211+
): # filtering out temp files from checks
209212
all_items[int(filesystem_object.name)] = self.get(filesystem_object)
210213
return all_items
211214

@@ -224,12 +227,8 @@ def __init__(self, dsn: FileUrl | str | ParseResult):
224227
raise InvalidQueueDSNError(f"{dsn.scheme} is not supported")
225228
self.backend = FileBackend(dsn.path)
226229

227-
def ready(self) -> list[dockerflow.checks.CheckMessage]:
228-
"""Heartbeat check to assert we can write items to queue
229-
230-
TODO: Convert to an async method when Dockerflow's FastAPI integration
231-
can run check asynchronously
232-
"""
230+
def check_writable(self) -> list[dockerflow.checks.CheckMessage]:
231+
"""Heartbeat check to assert we can write items to queue"""
233232
results = []
234233
ping_result = self.backend.ping()
235234
if ping_result is False:
@@ -240,11 +239,25 @@ def ready(self) -> list[dockerflow.checks.CheckMessage]:
240239
id="queue.backend.ping",
241240
)
242241
)
242+
return results
243243

244+
async def check_readable(self) -> list[dockerflow.checks.CheckMessage]:
245+
results = []
244246
try:
245-
bugs_items = asyncio.run(self.retrieve())
246-
for items in bugs_items.values():
247-
asyncio.run(async_iter(items))
247+
bugs = await self.retrieve()
248+
249+
for bug_id, items in bugs.items():
250+
try:
251+
bug_items = (await self.retrieve()).values()
252+
[[i async for i in items] for items in bug_items]
253+
except QueueItemRetrievalError as exc:
254+
results.append(
255+
dockerflow.checks.Error(
256+
f"failed to parse file {str(exc.path)}",
257+
hint="check that parked event files are not corrupt",
258+
id="queue.backend.read",
259+
)
260+
)
248261
except Exception as exc:
249262
logger.exception(exc)
250263
results.append(
@@ -254,7 +267,6 @@ def ready(self) -> list[dockerflow.checks.CheckMessage]:
254267
id="queue.backend.retrieve",
255268
)
256269
)
257-
258270
return results
259271

260272
async def postpone(self, payload: bugzilla.WebhookRequest) -> None:

poetry.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ fastapi = "^0.110.2"
1111
pydantic = {version = "^2.7.0", extras = ["email"]}
1212
uvicorn = {extras = ["standard"], version = "^0.29.0"}
1313
atlassian-python-api = "^3.41.11"
14-
dockerflow = {extras = ["fastapi"], version = "2024.4.1"}
14+
dockerflow = {extras = ["fastapi"], version = "2024.4.2"}
1515
Jinja2 = "^3.1.3"
1616
sentry-sdk = {extras = ["fastapi"], version = "^1.45.0"}
1717
pydantic-yaml = "^1.3.0"

tests/unit/jira/test_queue.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,20 @@ async def test_backend_get_all_invalid_json(backend: QueueBackend, queue_item_fa
149149
assert len(items) == 2
150150

151151

152+
@pytest.mark.asyncio
153+
async def test_backend_get_all_ignores_bad_folders(
154+
backend: QueueBackend, queue_item_factory
155+
):
156+
item_1 = queue_item_factory()
157+
await backend.put(item_1)
158+
159+
corrupt_file_dir = backend.location / "abc"
160+
corrupt_file_dir.mkdir()
161+
162+
items = await backend.get_all()
163+
assert len(items) == 1
164+
165+
152166
@pytest.mark.asyncio
153167
async def test_backend_get_all_payload_doesnt_match_schema(
154168
backend: QueueBackend, queue_item_factory
@@ -179,6 +193,7 @@ async def test_backend_get_invalid_json(backend: QueueBackend, queue_item_factor
179193
await anext(items)
180194

181195

196+
@pytest.mark.asyncio
182197
async def test_get_missing_timezone(backend: QueueBackend, queue_item_factory):
183198
item = queue_item_factory.build(payload__bug__id=666)
184199
dump = item.model_dump()
@@ -196,6 +211,7 @@ async def test_get_missing_timezone(backend: QueueBackend, queue_item_factory):
196211
assert "2024-04-18T12:46:54Z" in item.model_dump_json(), "timezone put in dump"
197212

198213

214+
@pytest.mark.asyncio
199215
async def test_backend_get_payload_doesnt_match_schema(
200216
backend: QueueBackend, queue_item_factory
201217
):
@@ -211,26 +227,32 @@ async def test_backend_get_payload_doesnt_match_schema(
211227
await anext(items)
212228

213229

214-
def test_ready_ok(queue: DeadLetterQueue):
215-
assert queue.ready() == []
230+
def test_check_writable_ok(queue: DeadLetterQueue):
231+
assert queue.check_writable() == []
216232

217233

218-
def test_ready_not_writable(queue: DeadLetterQueue, tmp_path):
234+
def test_check_writable_not_writable(queue: DeadLetterQueue, tmp_path):
219235
queue.backend = FileBackend(tmp_path)
220236
tmp_path.chmod(0o400) # set to readonly
221-
[failure] = queue.ready()
237+
[failure] = queue.check_writable()
222238
assert failure.id == "queue.backend.ping"
223239

224240

225-
def test_ready_not_parseable(queue: DeadLetterQueue):
241+
@pytest.mark.asyncio
242+
async def test_check_readable_ok(queue: DeadLetterQueue):
243+
assert await queue.check_readable() == []
244+
245+
246+
@pytest.mark.asyncio
247+
async def test_check_readable_not_parseable(queue: DeadLetterQueue):
226248
corrupt_file_dir = queue.backend.location / "999"
227249
corrupt_file_dir.mkdir()
228250
corrupt_file_path = corrupt_file_dir / "xxx.json"
229251
corrupt_file_path.write_text("BOOM")
230252

231-
[failure] = queue.ready()
232-
assert failure.id == "queue.backend.retrieve"
233-
assert failure.hint.startswith("invalid data: Unable to load item at path /")
253+
[failure] = await queue.check_readable()
254+
assert failure.id == "queue.backend.read"
255+
assert failure.hint.startswith("check that parked event files are not corrupt")
234256

235257

236258
@pytest.mark.asyncio

tests/unit/test_router.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,8 @@ def test_read_heartbeat_success(
479479
"jira.all_projects_are_visible": "ok",
480480
"jira.all_projects_have_permissions": "ok",
481481
"jira.pandoc_install": "ok",
482-
"queue.ready": "ok",
482+
"queue.writable": "ok",
483+
"queue.readable": "ok",
483484
},
484485
"details": {},
485486
"status": "ok",

0 commit comments

Comments
 (0)