Skip to content

Commit eec3737

Browse files
authored
Add ability to remove items from the queue by identifier (#1029)
Also: * Fix warning about invalid escape sequence in the queue's `get_all` method * Move queue test module out of `jira` test subdir
1 parent 2d3833e commit eec3737

File tree

5 files changed

+117
-2
lines changed

5 files changed

+117
-2
lines changed

jbi/queue.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,19 @@
4343
logger = logging.getLogger(__name__)
4444

4545

46+
ITEM_ID_PATTERN = re.compile(
47+
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\+\d{2}:\d{2})-(?P<bug_id>\d+)-(?P<action>\w*)-(?P<status>error|postponed)"
48+
)
49+
50+
51+
def extract_bug_id_from_item_id(item_id: str) -> str:
52+
if match := re.search(ITEM_ID_PATTERN, item_id):
53+
return match.group("bug_id")
54+
raise ValueError(
55+
"item_id %s did not match expected format: %s", item_id, ITEM_ID_PATTERN.pattern
56+
)
57+
58+
4659
class QueueItemRetrievalError(Exception):
4760
def __init__(self, message=None, path=None):
4861
self.message = message or "Error reading or parsing queue item"
@@ -127,6 +140,13 @@ def get(self, bug_id: int) -> AsyncIterator[QueueItem]:
127140
"""
128141
pass
129142

143+
@abstractmethod
144+
async def exists(self, item_id: str) -> bool:
145+
"""
146+
Report whether an item with id `item_id` exists in the queue
147+
"""
148+
pass
149+
130150
@abstractmethod
131151
async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]:
132152
"""Retrieve all items in the queue, grouped by bug
@@ -197,6 +217,19 @@ async def remove(self, bug_id: int, identifier: str):
197217
bug_dir.rmdir()
198218
logger.debug("Removed directory for bug %s", bug_id)
199219

220+
async def exists(self, item_id: str) -> bool:
221+
try:
222+
bug_id = extract_bug_id_from_item_id(item_id)
223+
except ValueError as e:
224+
logger.warning(
225+
"provided item_id %s did not match expected format", item_id, exc_info=e
226+
)
227+
return False
228+
229+
item_path = (self.location / bug_id / item_id).with_suffix(".json")
230+
# even though pathlib.Path.exists() returns a bool, mypy doesn't seem to get it
231+
return bool(item_path.exists())
232+
200233
async def get(self, bug_id: int) -> AsyncIterator[QueueItem]:
201234
folder = self.location / str(bug_id)
202235
if not folder.is_dir():
@@ -214,7 +247,7 @@ async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]:
214247
all_items: dict[int, AsyncIterator[QueueItem]] = {}
215248
for filesystem_object in self.location.iterdir():
216249
if filesystem_object.is_dir() and re.match(
217-
"\d", filesystem_object.name
250+
r"\d", filesystem_object.name
218251
): # filtering out temp files from checks
219252
all_items[int(filesystem_object.name)] = self.get(filesystem_object)
220253
return all_items
@@ -320,3 +353,16 @@ async def done(self, item: QueueItem) -> None:
320353
Mark item as done, remove from queue.
321354
"""
322355
return await self.backend.remove(item.payload.bug.id, item.identifier)
356+
357+
async def exists(self, item_id) -> bool:
358+
"""
359+
Report whether an item with id `item_id` exists in the queue
360+
"""
361+
return await self.backend.exists(item_id)
362+
363+
async def delete(self, item_id) -> None:
364+
"""
365+
Remove an item from the queue by item_id
366+
"""
367+
bug_id = extract_bug_id_from_item_id(item_id)
368+
await self.backend.remove(bug_id=int(bug_id), identifier=item_id)

jbi/router.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,19 @@ async def inspect_dl_queue(queue: Annotated[DeadLetterQueue, Depends(get_dl_queu
101101
return results
102102

103103

104+
@router.delete("/dl_queue/{item_id}", dependencies=[Depends(api_key_auth)])
105+
async def delete_queue_item_by_id(
106+
item_id: str, queue: Annotated[DeadLetterQueue, Depends(get_dl_queue)]
107+
):
108+
item_exists = await queue.exists(item_id)
109+
if item_exists:
110+
await queue.delete(item_id)
111+
else:
112+
raise HTTPException(
113+
status_code=404, detail=f"Item {item_id} not found in queue"
114+
)
115+
116+
104117
@router.get(
105118
"/whiteboard_tags/",
106119
dependencies=[Depends(api_key_auth)],

tests/fixtures/factories.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class Meta:
115115
changes = None
116116
routing_key = "bug.create"
117117
target = "bug"
118-
time = factory.LazyFunction(lambda: datetime.now(UTC))
118+
time = factory.LazyFunction(lambda: datetime.now(UTC).isoformat(timespec="seconds"))
119119
user = factory.SubFactory(WebhookUserFactory)
120120

121121

tests/unit/jira/test_queue.py renamed to tests/unit/test_queue.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,3 +341,22 @@ async def test_done(queue: DeadLetterQueue, queue_item_factory):
341341

342342
await queue.done(item)
343343
assert await queue.backend.size() == 0
344+
345+
346+
@pytest.mark.asyncio
347+
async def test_delete(queue: DeadLetterQueue, queue_item_factory):
348+
item = queue_item_factory()
349+
350+
await queue.backend.put(item)
351+
assert await queue.backend.size() == 1
352+
353+
await queue.delete(item.identifier)
354+
assert await queue.backend.size() == 0
355+
356+
357+
@pytest.mark.asyncio
358+
async def test_exists(queue: DeadLetterQueue, queue_item_factory):
359+
item = queue_item_factory()
360+
361+
await queue.backend.put(item)
362+
assert await queue.exists(item.identifier) is True

tests/unit/test_router.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,43 @@ async def test_dl_queue_endpoint(
110110
}
111111

112112

113+
@pytest.mark.asyncio
114+
async def test_delete_queue_item_by_id(
115+
dl_queue, authenticated_client, webhook_request_factory
116+
):
117+
item = webhook_request_factory(event__time=datetime(1982, 5, 8, 9, 10))
118+
await dl_queue.track_failed(item, Exception("boom"), rid="rid")
119+
120+
resp = authenticated_client.delete(
121+
"/dl_queue/1982-05-08%2009:10:00+00:00-654321-create-error"
122+
)
123+
assert resp.status_code == 200
124+
125+
126+
@pytest.mark.asyncio
127+
async def test_delete_queue_item_by_id_item_doesnt_exist(
128+
dl_queue, authenticated_client, webhook_request_factory
129+
):
130+
item = webhook_request_factory(event__time=datetime(1982, 5, 8, 9, 10))
131+
await dl_queue.track_failed(item, Exception("boom"), rid="rid")
132+
133+
resp = authenticated_client.delete("/dl_queue/return-a-404-4-me")
134+
assert resp.status_code == 404
135+
136+
137+
@pytest.mark.asyncio
138+
async def test_delete_queue_item_by_id_requires_authn(
139+
dl_queue, anon_client, webhook_request_factory
140+
):
141+
item = webhook_request_factory(event__time=datetime(1982, 5, 8, 9, 10))
142+
await dl_queue.track_failed(item, Exception("boom"), rid="rid")
143+
144+
resp = anon_client.delete(
145+
"/dl_queue/1982-05-08%2009:10:00+00:00-654321-create-error"
146+
)
147+
assert resp.status_code == 401
148+
149+
113150
def test_powered_by_jbi(exclude_middleware, authenticated_client):
114151
resp = authenticated_client.get("/powered_by_jbi/")
115152
html = resp.text

0 commit comments

Comments
 (0)