-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathclient.py
More file actions
76 lines (62 loc) · 1.98 KB
/
client.py
File metadata and controls
76 lines (62 loc) · 1.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from os import environ
from typing import Optional, Union, Literal
from qstash.dlq import DlqApi
from qstash.log import LogApi
from qstash.http import RetryConfig, HttpClient
from qstash.message import MessageApi
from qstash.queue import QueueApi
from qstash.schedule import ScheduleApi
from qstash.signing_key import SigningKeyApi
from qstash.url_group import UrlGroupApi
class QStash:
"""Synchronous SDK for the Upstash QStash."""
def __init__(
self,
token: str,
*,
retry: Optional[Union[Literal[False], RetryConfig]] = None,
base_url: Optional[str] = None,
) -> None:
"""
:param token: The authorization token from the Upstash console.
:param retry: Configures how the client should retry requests.
"""
self.http = HttpClient(
token,
retry,
base_url or environ.get("QSTASH_URL"),
)
self.message = MessageApi(self.http)
"""Message api."""
self.url_group = UrlGroupApi(self.http)
"""Url group api."""
self.queue = QueueApi(self.http)
"""Queue api."""
self.schedule = ScheduleApi(self.http)
"""Schedule api."""
self.signing_key = SigningKeyApi(self.http)
"""Signing key api."""
self.log = LogApi(self.http)
"""Log api."""
self.dlq = DlqApi(self.http)
"""Dlq (Dead Letter Queue) api."""
def liveness(self) -> str:
"""
Check if the QStash API is alive.
:return: "OK" if the API is alive.
"""
return self.http.request(
path="/v2/liveness",
method="GET",
parse_response=False,
)
def readiness(self) -> str:
"""
Check if the QStash API is ready to accept requests.
:return: "OK" if the API is ready.
"""
return self.http.request(
path="/v2/readiness",
method="GET",
parse_response=False,
)