-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathclient.py
More file actions
75 lines (62 loc) · 2.12 KB
/
client.py
File metadata and controls
75 lines (62 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from os import environ
from typing import Literal, Optional, Union
from qstash.asyncio.dlq import AsyncDlqApi
from qstash.asyncio.log import AsyncLogApi
from qstash.asyncio.http import AsyncHttpClient
from qstash.asyncio.message import AsyncMessageApi
from qstash.asyncio.queue import AsyncQueueApi
from qstash.asyncio.schedule import AsyncScheduleApi
from qstash.asyncio.signing_key import AsyncSigningKeyApi
from qstash.asyncio.url_group import AsyncUrlGroupApi
from qstash.http import RetryConfig
class AsyncQStash:
def __init__(
self,
token: str,
*,
retry: Optional[Union[Literal[False], RetryConfig]] = None,
base_url: Optional[str] = None,
) -> None:
"""
:param token: The authorization token from the Upstash console.
:param retry: Configures how the client should retry requests.
"""
self.http = AsyncHttpClient(
token,
retry,
base_url or environ.get("QSTASH_URL"),
)
self.message = AsyncMessageApi(self.http)
"""Message api."""
self.url_group = AsyncUrlGroupApi(self.http)
"""Url group api."""
self.queue = AsyncQueueApi(self.http)
"""Queue api."""
self.schedule = AsyncScheduleApi(self.http)
"""Schedule api."""
self.signing_key = AsyncSigningKeyApi(self.http)
"""Signing key api."""
self.log = AsyncLogApi(self.http)
"""Log api."""
self.dlq = AsyncDlqApi(self.http)
"""Dlq (Dead Letter Queue) api."""
async def liveness(self) -> str:
"""
Check if the QStash API is alive.
:return: "OK" if the API is alive.
"""
return await self.http.request(
path="/v2/liveness",
method="GET",
parse_response=False,
)
async def readiness(self) -> str:
"""
Check if the QStash API is ready to accept requests.
:return: "OK" if the API is ready.
"""
return await self.http.request(
path="/v2/readiness",
method="GET",
parse_response=False,
)