Skip to content

Commit 873ed90

Browse files
authored
DX-1614: Parallelism and Calls per second parameters (#36)
* feat: add flow control * fix: update tests * fix: lint * fix: lint * fix: lint * fix: batch response type * fix: add flow control to batch request * fix: use typeddict instead of dataclass * fix: docstrings and tests * fix: lint * fix: review * fix: format
1 parent d7d2478 commit 873ed90

9 files changed

+269
-0
lines changed

qstash/asyncio/message.py

+10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from qstash.http import HttpMethod
66
from qstash.message import (
77
ApiT,
8+
FlowControl,
89
BatchJsonRequest,
910
BatchRequest,
1011
BatchResponse,
@@ -47,6 +48,7 @@ async def publish(
4748
deduplication_id: Optional[str] = None,
4849
content_based_deduplication: Optional[bool] = None,
4950
timeout: Optional[Union[str, int]] = None,
51+
flow_control: Optional[FlowControl] = None,
5052
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
5153
"""
5254
Publishes a message to QStash.
@@ -84,6 +86,8 @@ async def publish(
8486
When a timeout is specified, it will be used instead of the maximum timeout
8587
value permitted by the QStash plan. It is useful in scenarios, where a message
8688
should be delivered with a shorter timeout.
89+
:param flow_control: Settings for controlling the number of active requests and
90+
number of requests per second with the same key.
8791
"""
8892
headers = headers or {}
8993
destination = get_destination(
@@ -105,6 +109,7 @@ async def publish(
105109
deduplication_id=deduplication_id,
106110
content_based_deduplication=content_based_deduplication,
107111
timeout=timeout,
112+
flow_control=flow_control,
108113
)
109114

110115
response = await self._http.request(
@@ -133,6 +138,7 @@ async def publish_json(
133138
deduplication_id: Optional[str] = None,
134139
content_based_deduplication: Optional[bool] = None,
135140
timeout: Optional[Union[str, int]] = None,
141+
flow_control: Optional[FlowControl] = None,
136142
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
137143
"""
138144
Publish a message to QStash, automatically serializing the
@@ -171,6 +177,8 @@ async def publish_json(
171177
When a timeout is specified, it will be used instead of the maximum timeout
172178
value permitted by the QStash plan. It is useful in scenarios, where a message
173179
should be delivered with a shorter timeout.
180+
:param flow_control: Settings for controlling the number of active requests and
181+
number of requests per second with the same key.
174182
"""
175183
return await self.publish(
176184
url=url,
@@ -188,6 +196,7 @@ async def publish_json(
188196
deduplication_id=deduplication_id,
189197
content_based_deduplication=content_based_deduplication,
190198
timeout=timeout,
199+
flow_control=flow_control,
191200
)
192201

193202
async def enqueue(
@@ -260,6 +269,7 @@ async def enqueue(
260269
deduplication_id=deduplication_id,
261270
content_based_deduplication=content_based_deduplication,
262271
timeout=timeout,
272+
flow_control=None,
263273
)
264274

265275
response = await self._http.request(

qstash/asyncio/schedule.py

+9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
parse_schedule_response,
99
prepare_schedule_headers,
1010
)
11+
from qstash.message import FlowControl
1112

1213

1314
class AsyncScheduleApi:
@@ -29,6 +30,7 @@ async def create(
2930
delay: Optional[Union[str, int]] = None,
3031
timeout: Optional[Union[str, int]] = None,
3132
schedule_id: Optional[str] = None,
33+
flow_control: Optional[FlowControl] = None,
3234
) -> str:
3335
"""
3436
Creates a schedule to send messages periodically.
@@ -56,6 +58,8 @@ async def create(
5658
value permitted by the QStash plan. It is useful in scenarios, where a message
5759
should be delivered with a shorter timeout.
5860
:param schedule_id: Schedule id to use. Can be used to update the settings of an existing schedule.
61+
:param flow_control: Settings for controlling the number of active requests and
62+
number of requests per second with the same key.
5963
"""
6064
req_headers = prepare_schedule_headers(
6165
cron=cron,
@@ -68,6 +72,7 @@ async def create(
6872
delay=delay,
6973
timeout=timeout,
7074
schedule_id=schedule_id,
75+
flow_control=flow_control,
7176
)
7277

7378
response = await self._http.request(
@@ -93,6 +98,7 @@ async def create_json(
9398
delay: Optional[Union[str, int]] = None,
9499
timeout: Optional[Union[str, int]] = None,
95100
schedule_id: Optional[str] = None,
101+
flow_control: Optional[FlowControl] = None,
96102
) -> str:
97103
"""
98104
Creates a schedule to send messages periodically, automatically serializing the
@@ -121,6 +127,8 @@ async def create_json(
121127
value permitted by the QStash plan. It is useful in scenarios, where a message
122128
should be delivered with a shorter timeout.
123129
:param schedule_id: Schedule id to use. Can be used to update the settings of an existing schedule.
130+
:param flow_control: Settings for controlling the number of active requests and
131+
number of requests per second with the same key.
124132
"""
125133
return await self.create(
126134
destination=destination,
@@ -135,6 +143,7 @@ async def create_json(
135143
delay=delay,
136144
timeout=timeout,
137145
schedule_id=schedule_id,
146+
flow_control=flow_control,
138147
)
139148

140149
async def get(self, schedule_id: str) -> Schedule:

qstash/dlq.py

+3
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ def parse_dlq_message_response(
101101
response_headers=response.get("responseHeader"),
102102
response_body=response.get("responseBody"),
103103
response_body_base64=response.get("responseBodyBase64"),
104+
flow_control_key=response.get("flowControlKey"),
105+
parallelism=response.get("parallelism"),
106+
rate_per_second=response.get("ratePerSecond"),
104107
)
105108

106109

qstash/message.py

+64
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ class LlmApi(TypedDict):
2828
ApiT = LlmApi # In the future, this can be union of different API types
2929

3030

31+
class FlowControl(TypedDict, total=False):
32+
key: str
33+
"""flow control key"""
34+
35+
parallelism: Optional[int]
36+
"""number of requests which can be active with the same key"""
37+
38+
rate_per_second: Optional[int]
39+
"""number of requests to activate per second with the same key"""
40+
41+
3142
@dataclasses.dataclass
3243
class PublishResponse:
3344
message_id: str
@@ -168,6 +179,12 @@ class BatchRequest(TypedDict, total=False):
168179
an integer, which will be interpreted as timeout in seconds.
169180
"""
170181

182+
flow_control: Optional[FlowControl]
183+
"""
184+
Settings for controlling the number of active requests and number of requests
185+
per second with the same key.
186+
"""
187+
171188

172189
class BatchJsonRequest(TypedDict, total=False):
173190
queue: str
@@ -254,6 +271,12 @@ class BatchJsonRequest(TypedDict, total=False):
254271
set according to the LLM provider.
255272
"""
256273

274+
flow_control: Optional[FlowControl]
275+
"""
276+
Settings for controlling the number of active requests and number of requests
277+
per second with the same key.
278+
"""
279+
257280

258281
@dataclasses.dataclass
259282
class Message:
@@ -317,6 +340,15 @@ class Message:
317340
caller_ip: Optional[str]
318341
"""IP address of the publisher of this message."""
319342

343+
flow_control_key: Optional[str]
344+
"""flow control key"""
345+
346+
parallelism: Optional[int]
347+
"""number of requests which can be active with the same flow control key"""
348+
349+
rate_per_second: Optional[int]
350+
"""number of requests to activate per second with the same flow control key"""
351+
320352

321353
def get_destination(
322354
*,
@@ -367,6 +399,7 @@ def prepare_headers(
367399
deduplication_id: Optional[str],
368400
content_based_deduplication: Optional[bool],
369401
timeout: Optional[Union[str, int]],
402+
flow_control: Optional[FlowControl],
370403
) -> Dict[str, str]:
371404
h = {}
372405

@@ -413,6 +446,21 @@ def prepare_headers(
413446
else:
414447
h["Upstash-Timeout"] = timeout
415448

449+
if flow_control and "key" in flow_control:
450+
control_values = []
451+
if "parallelism" in flow_control:
452+
control_values.append(f"parallelism={flow_control['parallelism']}")
453+
if "rate_per_second" in flow_control:
454+
control_values.append(f"rate={flow_control['rate_per_second']}")
455+
456+
if not control_values:
457+
raise QStashError(
458+
"Provide at least one of parallelism or rate_per_second for flow_control"
459+
)
460+
461+
h["Upstash-Flow-Control-Key"] = flow_control["key"]
462+
h["Upstash-Flow-Control-Value"] = ", ".join(control_values)
463+
416464
return h
417465

418466

@@ -484,6 +532,7 @@ def prepare_batch_message_body(messages: List[BatchRequest]) -> str:
484532
deduplication_id=msg.get("deduplication_id"),
485533
content_based_deduplication=msg.get("content_based_deduplication"),
486534
timeout=msg.get("timeout"),
535+
flow_control=msg.get("flow_control"),
487536
)
488537

489538
batch_messages.append(
@@ -581,6 +630,9 @@ def convert_to_batch_messages(
581630
if "timeout" in msg:
582631
batch_msg["timeout"] = msg["timeout"]
583632

633+
if "flow_control" in msg:
634+
batch_msg["flow_control"] = msg["flow_control"]
635+
584636
batch_messages.append(batch_msg)
585637

586638
return batch_messages
@@ -605,6 +657,9 @@ def parse_message_response(response: Dict[str, Any]) -> Message:
605657
failure_callback=response.get("failureCallback"),
606658
schedule_id=response.get("scheduleId"),
607659
caller_ip=response.get("callerIP"),
660+
flow_control_key=response.get("flowControlKey"),
661+
parallelism=response.get("parallelism"),
662+
rate_per_second=response.get("rate"),
608663
)
609664

610665

@@ -630,6 +685,7 @@ def publish(
630685
deduplication_id: Optional[str] = None,
631686
content_based_deduplication: Optional[bool] = None,
632687
timeout: Optional[Union[str, int]] = None,
688+
flow_control: Optional[FlowControl] = None,
633689
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
634690
"""
635691
Publishes a message to QStash.
@@ -667,6 +723,8 @@ def publish(
667723
When a timeout is specified, it will be used instead of the maximum timeout
668724
value permitted by the QStash plan. It is useful in scenarios, where a message
669725
should be delivered with a shorter timeout.
726+
:param flow_control: Settings for controlling the number of active requests and
727+
number of requests per second with the same key.
670728
"""
671729
headers = headers or {}
672730
destination = get_destination(
@@ -688,6 +746,7 @@ def publish(
688746
deduplication_id=deduplication_id,
689747
content_based_deduplication=content_based_deduplication,
690748
timeout=timeout,
749+
flow_control=flow_control,
691750
)
692751

693752
response = self._http.request(
@@ -716,6 +775,7 @@ def publish_json(
716775
deduplication_id: Optional[str] = None,
717776
content_based_deduplication: Optional[bool] = None,
718777
timeout: Optional[Union[str, int]] = None,
778+
flow_control: Optional[FlowControl] = None,
719779
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
720780
"""
721781
Publish a message to QStash, automatically serializing the
@@ -754,6 +814,8 @@ def publish_json(
754814
When a timeout is specified, it will be used instead of the maximum timeout
755815
value permitted by the QStash plan. It is useful in scenarios, where a message
756816
should be delivered with a shorter timeout.
817+
:param flow_control: Settings for controlling the number of active requests and
818+
number of requests per second with the same key.
757819
"""
758820
return self.publish(
759821
url=url,
@@ -771,6 +833,7 @@ def publish_json(
771833
deduplication_id=deduplication_id,
772834
content_based_deduplication=content_based_deduplication,
773835
timeout=timeout,
836+
flow_control=flow_control,
774837
)
775838

776839
def enqueue(
@@ -843,6 +906,7 @@ def enqueue(
843906
deduplication_id=deduplication_id,
844907
content_based_deduplication=content_based_deduplication,
845908
timeout=timeout,
909+
flow_control=None,
846910
)
847911

848912
response = self._http.request(

0 commit comments

Comments
 (0)