-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathflow_control.py
More file actions
134 lines (110 loc) · 4.29 KB
/
Copy pathflow_control.py
File metadata and controls
134 lines (110 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from typing import Dict
from qstash.asyncio.http import AsyncHttpClient
from qstash.flow_control_api import (
FlowControlInfo,
GlobalParallelismInfo,
PinFlowControlOptions,
UnpinFlowControlOptions,
parse_flow_control_info,
)
class AsyncFlowControlApi:
def __init__(self, http: AsyncHttpClient) -> None:
self._http = http
async def get(self, flow_control_key: str) -> FlowControlInfo:
"""
Gets a single flow control by key.
:param flow_control_key: The flow control key to get.
"""
response = await self._http.request(
path=f"/v2/flowControl/{flow_control_key}",
method="GET",
)
return parse_flow_control_info(response)
async def get_global_parallelism(self) -> GlobalParallelismInfo:
"""
Gets the global parallelism info.
"""
response = await self._http.request(
path="/v2/globalParallelism",
method="GET",
)
return GlobalParallelismInfo(
parallelism_max=response.get("parallelismMax", 0),
parallelism_count=response.get("parallelismCount", 0),
)
async def pause(self, flow_control_key: str) -> None:
"""
Pause message delivery for a flow-control key.
Messages already in the waitlist will remain there.
New incoming messages will be added directly to the waitlist.
:param flow_control_key: The flow control key to pause.
"""
await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/pause",
method="POST",
parse_response=False,
)
async def resume(self, flow_control_key: str) -> None:
"""
Resume message delivery for a flow-control key.
:param flow_control_key: The flow control key to resume.
"""
await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/resume",
method="POST",
parse_response=False,
)
async def pin(self, flow_control_key: str, options: PinFlowControlOptions) -> None:
"""
Pin a processing configuration for a flow-control key.
While pinned, the system ignores configurations provided by incoming
messages and uses the pinned configuration instead.
:param flow_control_key: The flow control key to pin.
:param options: The configuration to pin.
"""
params: Dict[str, str] = {}
if "parallelism" in options:
params["parallelism"] = str(options["parallelism"])
if "rate" in options:
params["rate"] = str(options["rate"])
if "period" in options:
params["period"] = str(options["period"])
await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/pin",
method="POST",
params=params or None,
parse_response=False,
)
async def unpin(
self, flow_control_key: str, options: UnpinFlowControlOptions
) -> None:
"""
Remove the pinned configuration for a flow-control key.
After unpinning, the system resumes updating the configuration
based on incoming messages.
:param flow_control_key: The flow control key to unpin.
:param options: Which configurations to unpin.
"""
params: Dict[str, str] = {}
if "parallelism" in options:
params["parallelism"] = str(options["parallelism"]).lower()
if "rate" in options:
params["rate"] = str(options["rate"]).lower()
await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/unpin",
method="POST",
params=params or None,
parse_response=False,
)
async def reset_rate(self, flow_control_key: str) -> None:
"""
Reset the rate configuration state for a flow-control key.
Clears the current rate count and immediately ends the current period.
The current timestamp becomes the start of the new rate period.
:param flow_control_key: The flow control key to reset rate for.
"""
await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/resetRate",
method="POST",
parse_response=False,
)