-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlog.py
More file actions
276 lines (202 loc) · 7.49 KB
/
log.py
File metadata and controls
276 lines (202 loc) · 7.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import dataclasses
import enum
from typing import Any, Dict, List, Optional, TypedDict
from qstash.http import HttpClient, HttpMethod
from qstash.message import parse_flow_control, FlowControlProperties
class LogState(enum.Enum):
"""State of the message."""
CREATED = "CREATED"
"""Message has been accepted and stored in QStash"""
ACTIVE = "ACTIVE"
"""Task is currently being processed by a worker."""
RETRY = "RETRY"
"""Task has been scheduled to retry."""
ERROR = "ERROR"
"""
Execution threw an error and the task is waiting to be retried
or failed.
"""
DELIVERED = "DELIVERED"
"""Message was successfully delivered."""
FAILED = "FAILED"
"""
Task has failed too many times or encountered an error that it
cannot recover from.
"""
CANCEL_REQUESTED = "CANCEL_REQUESTED"
"""Cancel request from the user is recorded."""
CANCELED = "CANCELED"
"""Cancel request from the user is honored."""
IN_PROGRESS = "IN_PROGRESS"
"""Messages which are in progress"""
@dataclasses.dataclass
class Log:
time: int
"""Unix time of the log entry, in milliseconds."""
message_id: str
"""Message id associated with the log."""
state: LogState
"""Current state of the message at this point in time."""
error: Optional[str]
"""An explanation what went wrong."""
next_delivery_time: Optional[int]
"""Next scheduled Unix time of the message, milliseconds."""
url: str
"""Destination url."""
url_group: Optional[str]
"""Name of the url group if this message was sent through a url group."""
endpoint: Optional[str]
"""Name of the endpoint if this message was sent through a url group."""
queue: Optional[str]
"""Name of the queue if this message is enqueued on a queue."""
schedule_id: Optional[str]
"""Schedule id of the message if the message is triggered by a schedule."""
body_base64: Optional[str]
"""Base64 encoded body of the message."""
headers: Optional[Dict[str, List[str]]]
"""Headers of the message"""
callback_headers: Optional[Dict[str, List[str]]]
"""Headers of the callback message"""
failure_callback_headers: Optional[Dict[str, List[str]]]
"""Headers of the failure callback message"""
response_status: Optional[int]
"""HTTP status code of the last failed delivery attempt."""
response_headers: Optional[Dict[str, List[str]]]
"""Response headers of the last failed delivery attempt."""
response_body: Optional[str]
"""Response body of the last failed delivery attempt."""
timeout: Optional[int]
"""HTTP timeout value used while calling the destination url."""
method: Optional[HttpMethod]
"""HTTP method to use to deliver the message."""
callback: Optional[str]
"""Url which is called each time the message is attempted to be delivered."""
failure_callback: Optional[str]
"""Url which is called after the message is failed."""
max_retries: Optional[int]
"""Number of retries that should be attempted in case of delivery failure."""
flow_control: Optional[FlowControlProperties]
"""Flow control properties"""
class LogFilter(TypedDict, total=False):
message_id: str
"""Filter logs by message id."""
message_ids: List[str]
"""Filter logs by message ids."""
state: LogState
"""Filter logs by state."""
url: str
"""Filter logs by url."""
url_group: str
"""Filter logs by url group name."""
queue: str
"""Filter logs by queue name."""
schedule_id: str
"""Filter logs by schedule id."""
from_time: int
"""Filter logs by starting Unix time, in milliseconds"""
to_time: int
"""Filter logs by ending Unix time, in milliseconds"""
@dataclasses.dataclass
class ListLogsResponse:
cursor: Optional[str]
"""
A cursor which can be used in subsequent requests to paginate through
all logs. If `None`, end of the logs are reached.
"""
logs: List[Log]
"""List of logs."""
def prepare_list_logs_request_params(
*,
cursor: Optional[str],
count: Optional[int],
filter: Optional[LogFilter],
) -> Dict[str, str]:
params: Dict[str, Any] = {}
if cursor is not None:
params["cursor"] = cursor
if count is not None:
params["count"] = str(count)
if filter is not None:
if "message_id" in filter:
params["messageId"] = filter["message_id"]
if "message_ids" in filter:
params["messageIds"] = filter["message_ids"]
if "state" in filter:
params["state"] = filter["state"].value
if "url" in filter:
params["url"] = filter["url"]
if "url_group" in filter:
params["topicName"] = filter["url_group"]
if "queue" in filter:
params["queueName"] = filter["queue"]
if "schedule_id" in filter:
params["scheduleId"] = filter["schedule_id"]
if "from_time" in filter:
params["fromDate"] = str(filter["from_time"])
if "to_time" in filter:
params["toDate"] = str(filter["to_time"])
return params
def parse_logs_response(response: List[Dict[str, Any]]) -> List[Log]:
logs = []
for event in response:
flow_control = parse_flow_control(event)
logs.append(
Log(
time=event["time"],
message_id=event["messageId"],
state=LogState(event["state"]),
error=event.get("error"),
next_delivery_time=event.get("nextDeliveryTime"),
url=event["url"],
url_group=event.get("topicName"),
endpoint=event.get("endpointName"),
queue=event.get("queueName"),
schedule_id=event.get("scheduleId"),
headers=event.get("header"),
callback_headers=event.get("callbackHeaders"),
failure_callback_headers=event.get("failureCallbackHeaders"),
body_base64=event.get("body"),
response_status=event.get("responseStatus"),
response_headers=event.get("responseHeader"),
response_body=event.get("responseBody"),
timeout=event.get("timeout"),
callback=event.get("callback"),
failure_callback=event.get("failureCallback"),
flow_control=flow_control,
method=event.get("method"),
max_retries=event.get("maxRetries"),
)
)
return logs
class LogApi:
def __init__(self, http: HttpClient) -> None:
self._http = http
def list(
self,
*,
cursor: Optional[str] = None,
count: Optional[int] = None,
filter: Optional[LogFilter] = None,
) -> ListLogsResponse:
"""
Lists all logs that happened, such as message creation or delivery.
:param cursor: Optional cursor to start listing logs from.
:param count: The maximum number of logs to return. \
Default and max is `1000`.
:param filter: Filter to use.
"""
params = prepare_list_logs_request_params(
cursor=cursor,
count=count,
filter=filter,
)
response = self._http.request(
path="/v2/events",
method="GET",
params=params,
)
logs = parse_logs_response(response["events"])
return ListLogsResponse(
cursor=response.get("cursor"),
logs=logs,
)