-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlog.py
More file actions
48 lines (40 loc) · 1.22 KB
/
log.py
File metadata and controls
48 lines (40 loc) · 1.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from typing import Optional
from qstash.asyncio.http import AsyncHttpClient
from qstash.log import (
LogFilter,
ListLogsResponse,
parse_logs_response,
prepare_list_logs_request_params,
)
class AsyncLogApi:
def __init__(self, http: AsyncHttpClient) -> None:
self._http = http
async def list(
self,
*,
cursor: Optional[str] = None,
count: Optional[int] = None,
filter: Optional[LogFilter] = None,
) -> ListLogsResponse:
"""
Lists all logs that happened, such as message creation or delivery.
:param cursor: Optional cursor to start listing logs from.
:param count: The maximum number of logs to return. \
Default and max is `1000`.
:param filter: Filter to use.
"""
params = prepare_list_logs_request_params(
cursor=cursor,
count=count,
filter=filter,
)
response = await self._http.request(
path="/v2/events",
method="GET",
params=params,
)
logs = parse_logs_response(response["events"])
return ListLogsResponse(
cursor=response.get("cursor"),
logs=logs,
)