-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_dlq.py
More file actions
94 lines (72 loc) · 2.67 KB
/
test_dlq.py
File metadata and controls
94 lines (72 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from qstash import QStash
from qstash.message import PublishResponse
from tests import assert_eventually
def assert_failed_eventually(client: QStash, *msg_ids: str) -> None:
def assertion() -> None:
messages = client.dlq.list().messages
matched_messages = [msg for msg in messages if msg.message_id in msg_ids]
assert len(matched_messages) == len(msg_ids)
for msg in matched_messages:
dlq_msg = client.dlq.get(msg.dlq_id)
assert dlq_msg.response_body == "404 Not Found"
assert msg.response_body == "404 Not Found"
assert dlq_msg.retry_delay_expression == "7000 * retried"
assert msg.retry_delay_expression == "7000 * retried"
if len(msg_ids) == 1:
client.dlq.delete(matched_messages[0].dlq_id)
else:
deleted = client.dlq.delete_many([m.dlq_id for m in matched_messages])
assert deleted == len(msg_ids)
messages = client.dlq.list().messages
matched = any(True for msg in messages if msg.message_id in msg_ids)
assert not matched
assert_eventually(
assertion,
initial_delay=2.0,
retry_delay=1.0,
timeout=10.0,
)
def test_dlq_get_and_delete(client: QStash) -> None:
res = client.message.publish_json(
method="GET",
url="https://mock.httpstatus.io/404",
retries=0,
retry_delay="7000 * retried",
)
assert isinstance(res, PublishResponse)
assert_failed_eventually(client, res.message_id)
def test_dlq_get_and_delete_many(client: QStash) -> None:
msg_ids = []
for _ in range(5):
res = client.message.publish_json(
method="GET",
url="https://mock.httpstatus.io/404",
retries=0,
retry_delay="7000 * retried",
)
assert isinstance(res, PublishResponse)
msg_ids.append(res.message_id)
assert_failed_eventually(client, *msg_ids)
def test_dlq_filter(client: QStash) -> None:
res = client.message.publish_json(
method="GET",
url="https://mock.httpstatus.io/404",
retries=0,
retry_delay="7000 * retried",
)
assert isinstance(res, PublishResponse)
def assertion() -> None:
messages = client.dlq.list(
filter={"message_id": res.message_id},
count=1,
).messages
assert len(messages) == 1
assert messages[0].message_id == res.message_id
assert messages[0].retry_delay_expression == "7000 * retried"
client.dlq.delete(messages[0].dlq_id)
assert_eventually(
assertion,
initial_delay=2.0,
retry_delay=1.0,
timeout=10.0,
)