-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathschedule.py
More file actions
290 lines (261 loc) · 11.7 KB
/
schedule.py
File metadata and controls
290 lines (261 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import json
from typing import Any, Dict, List, Optional, Union
from qstash.asyncio.http import AsyncHttpClient
from qstash.http import HttpMethod
from qstash.message import FlowControl
from qstash.schedule import (
Schedule,
parse_schedule_response,
prepare_schedule_headers,
)
class AsyncScheduleApi:
def __init__(self, http: AsyncHttpClient) -> None:
self._http = http
async def create(
self,
*,
destination: str,
cron: str,
body: Optional[Union[str, bytes]] = None,
content_type: Optional[str] = None,
method: Optional[HttpMethod] = None,
headers: Optional[Dict[str, str]] = None,
callback_headers: Optional[Dict[str, str]] = None,
failure_callback_headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
retry_delay: Optional[str] = None,
callback: Optional[str] = None,
failure_callback: Optional[str] = None,
delay: Optional[Union[str, int]] = None,
timeout: Optional[Union[str, int]] = None,
schedule_id: Optional[str] = None,
queue: Optional[str] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
) -> str:
"""
Creates a schedule to send messages periodically.
Returns the created schedule id.
:param destination: The destination url or url group.
:param cron: The cron expression to use to schedule the messages.
:param body: The raw request message body passed to the destination as is.
:param content_type: MIME type of the message.
:param method: The HTTP method to use when sending a webhook to your API.
:param headers: Headers to forward along with the message.
:param callback_headers: Headers to forward along with the callback message.
:param failure_callback_headers: Headers to forward along with the failure
callback message.
:param retries: How often should this message be retried in case the destination
API is not available.
:param retry_delay: Delay between retries.
By default, the `retryDelay` is exponential backoff.
More details can be found in: https://upstash.com/docs/qstash/features/retry.
The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.
You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
The special variable `retried` represents the current retry attempt count (starting from 0).
Supported functions:
- `pow`
- `sqrt`
- `abs`
- `exp`
- `floor`
- `ceil`
- `round`
- `min`
- `max`
Examples of valid `retryDelay` values:
```py
1000 # 1 second
1000 * (1 + retried) # 1 second multiplied by the current retry attempt
pow(2, retried) # 2 to the power of the current retry attempt
max(10, pow(2, retried)) # The greater of 10 or 2^retried
```
:param callback: A callback url that will be called after each attempt.
:param failure_callback: A failure callback url that will be called when a delivery
is failed, that is when all the defined retries are exhausted.
:param delay: Delay the message delivery. The format for the delay string is a
number followed by duration abbreviation, like `10s`. Available durations
are `s` (seconds), `m` (minutes), `h` (hours), and `d` (days). As convenience,
it is also possible to specify the delay as an integer, which will be
interpreted as delay in seconds.
:param timeout: The HTTP timeout value to use while calling the destination URL.
When a timeout is specified, it will be used instead of the maximum timeout
value permitted by the QStash plan. It is useful in scenarios, where a message
should be delivered with a shorter timeout.
:param schedule_id: Schedule id to use. This can be used to update the settings
of an existing schedule.
:param queue: Name of the queue which the scheduled messages will be enqueued.
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
"""
req_headers = prepare_schedule_headers(
cron=cron,
content_type=content_type,
method=method,
headers=headers,
callback_headers=callback_headers,
failure_callback_headers=failure_callback_headers,
retries=retries,
retry_delay=retry_delay,
callback=callback,
failure_callback=failure_callback,
delay=delay,
timeout=timeout,
schedule_id=schedule_id,
queue=queue,
flow_control=flow_control,
label=label,
)
response = await self._http.request(
path=f"/v2/schedules/{destination}",
method="POST",
headers=req_headers,
body=body,
)
return response["scheduleId"] # type:ignore[no-any-return]
async def create_json(
self,
*,
destination: str,
cron: str,
body: Optional[Any] = None,
method: Optional[HttpMethod] = None,
headers: Optional[Dict[str, str]] = None,
callback_headers: Optional[Dict[str, str]] = None,
failure_callback_headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
retry_delay: Optional[str] = None,
callback: Optional[str] = None,
failure_callback: Optional[str] = None,
delay: Optional[Union[str, int]] = None,
timeout: Optional[Union[str, int]] = None,
schedule_id: Optional[str] = None,
queue: Optional[str] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
) -> str:
"""
Creates a schedule to send messages periodically, automatically serializing the
body as JSON string, and setting content type to `application/json`.
Returns the created schedule id.
:param destination: The destination url or url group.
:param cron: The cron expression to use to schedule the messages.
:param body: The request message body passed to the destination after being
serialized as JSON string.
:param method: The HTTP method to use when sending a webhook to your API.
:param headers: Headers to forward along with the message.
:param callback_headers: Headers to forward along with the callback message.
:param failure_callback_headers: Headers to forward along with the failure
callback message.
:param retries: How often should this message be retried in case the destination
API is not available.
:param retry_delay: Delay between retries.
By default, the `retryDelay` is exponential backoff.
More details can be found in: https://upstash.com/docs/qstash/features/retry.
The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.
You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
The special variable `retried` represents the current retry attempt count (starting from 0).
Supported functions:
- `pow`
- `sqrt`
- `abs`
- `exp`
- `floor`
- `ceil`
- `round`
- `min`
- `max`
Examples of valid `retryDelay` values:
```py
1000 # 1 second
1000 * (1 + retried) # 1 second multiplied by the current retry attempt
pow(2, retried) # 2 to the power of the current retry attempt
max(10, pow(2, retried)) # The greater of 10 or 2^retried
```
:param callback: A callback url that will be called after each attempt.
:param failure_callback: A failure callback url that will be called when a delivery
is failed, that is when all the defined retries are exhausted.
:param delay: Delay the message delivery. The format for the delay string is a
number followed by duration abbreviation, like `10s`. Available durations
are `s` (seconds), `m` (minutes), `h` (hours), and `d` (days). As convenience,
it is also possible to specify the delay as an integer, which will be
interpreted as delay in seconds.
:param timeout: The HTTP timeout value to use while calling the destination URL.
When a timeout is specified, it will be used instead of the maximum timeout
value permitted by the QStash plan. It is useful in scenarios, where a message
should be delivered with a shorter timeout.
:param schedule_id: Schedule id to use. This can be used to update the settings
of an existing schedule.
:param queue: Name of the queue which the scheduled messages will be enqueued.
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
"""
return await self.create(
destination=destination,
cron=cron,
body=json.dumps(body),
content_type="application/json",
method=method,
headers=headers,
callback_headers=callback_headers,
failure_callback_headers=failure_callback_headers,
retries=retries,
retry_delay=retry_delay,
callback=callback,
failure_callback=failure_callback,
delay=delay,
timeout=timeout,
schedule_id=schedule_id,
queue=queue,
flow_control=flow_control,
label=label,
)
async def get(self, schedule_id: str) -> Schedule:
"""
Gets the schedule by its id.
"""
response = await self._http.request(
path=f"/v2/schedules/{schedule_id}",
method="GET",
)
return parse_schedule_response(response)
async def list(self) -> List[Schedule]:
"""
Lists all the schedules.
"""
response = await self._http.request(
path="/v2/schedules",
method="GET",
)
return [parse_schedule_response(r) for r in response]
async def delete(self, schedule_id: str) -> None:
"""
Deletes the schedule.
"""
await self._http.request(
path=f"/v2/schedules/{schedule_id}",
method="DELETE",
parse_response=False,
)
async def pause(self, schedule_id: str) -> None:
"""
Pauses the schedule.
A paused schedule will not produce new messages until
it is resumed.
"""
await self._http.request(
path=f"/v2/schedules/{schedule_id}/pause",
method="PATCH",
parse_response=False,
)
async def resume(self, schedule_id: str) -> None:
"""
Resumes the schedule.
"""
await self._http.request(
path=f"/v2/schedules/{schedule_id}/resume",
method="PATCH",
parse_response=False,
)