Skip to content

Commit 35ff096

Browse files
authored
DX-2069: Add label parameter (#46)
* feat: add label parameter * fix: fmt * fix: safely read openai key * fix: fmt
1 parent c606152 commit 35ff096

File tree

10 files changed

+498
-0
lines changed

10 files changed

+498
-0
lines changed

qstash/asyncio/message.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ async def publish(
5252
content_based_deduplication: Optional[bool] = None,
5353
timeout: Optional[Union[str, int]] = None,
5454
flow_control: Optional[FlowControl] = None,
55+
label: Optional[str] = None,
5556
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
5657
"""
5758
Publishes a message to QStash.
@@ -122,6 +123,7 @@ async def publish(
122123
should be delivered with a shorter timeout.
123124
:param flow_control: Settings for controlling the number of active requests,
124125
as well as the rate of requests with the same flow control key.
126+
:param label: Assign a label to the request to filter logs with it later.
125127
"""
126128
headers = headers or {}
127129
destination = get_destination(
@@ -147,6 +149,7 @@ async def publish(
147149
content_based_deduplication=content_based_deduplication,
148150
timeout=timeout,
149151
flow_control=flow_control,
152+
label=label,
150153
)
151154

152155
response = await self._http.request(
@@ -179,6 +182,7 @@ async def publish_json(
179182
content_based_deduplication: Optional[bool] = None,
180183
timeout: Optional[Union[str, int]] = None,
181184
flow_control: Optional[FlowControl] = None,
185+
label: Optional[str] = None,
182186
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
183187
"""
184188
Publish a message to QStash, automatically serializing the
@@ -250,6 +254,7 @@ async def publish_json(
250254
should be delivered with a shorter timeout.
251255
:param flow_control: Settings for controlling the number of active requests,
252256
as well as the rate of requests with the same flow control key.
257+
:param label: Assign a label to the request to filter logs with it later.
253258
"""
254259
return await self.publish(
255260
url=url,
@@ -271,6 +276,7 @@ async def publish_json(
271276
content_based_deduplication=content_based_deduplication,
272277
timeout=timeout,
273278
flow_control=flow_control,
279+
label=label,
274280
)
275281

276282
async def enqueue(
@@ -293,6 +299,7 @@ async def enqueue(
293299
deduplication_id: Optional[str] = None,
294300
content_based_deduplication: Optional[bool] = None,
295301
timeout: Optional[Union[str, int]] = None,
302+
label: Optional[str] = None,
296303
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
297304
"""
298305
Enqueues a message, after creating the queue if it does
@@ -356,6 +363,7 @@ async def enqueue(
356363
When a timeout is specified, it will be used instead of the maximum timeout
357364
value permitted by the QStash plan. It is useful in scenarios, where a message
358365
should be delivered with a shorter timeout.
366+
:param label: Assign a label to the request to filter logs with it later.
359367
"""
360368
headers = headers or {}
361369
destination = get_destination(
@@ -381,6 +389,7 @@ async def enqueue(
381389
content_based_deduplication=content_based_deduplication,
382390
timeout=timeout,
383391
flow_control=None,
392+
label=label,
384393
)
385394

386395
response = await self._http.request(
@@ -411,6 +420,7 @@ async def enqueue_json(
411420
deduplication_id: Optional[str] = None,
412421
content_based_deduplication: Optional[bool] = None,
413422
timeout: Optional[Union[str, int]] = None,
423+
label: Optional[str] = None,
414424
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
415425
"""
416426
Enqueues a message, after creating the queue if it does
@@ -475,6 +485,7 @@ async def enqueue_json(
475485
When a timeout is specified, it will be used instead of the maximum timeout
476486
value permitted by the QStash plan. It is useful in scenarios, where a message
477487
should be delivered with a shorter timeout.
488+
:param label: Assign a label to the request to filter logs with it later.
478489
"""
479490
return await self.enqueue(
480491
queue=queue,
@@ -494,6 +505,7 @@ async def enqueue_json(
494505
deduplication_id=deduplication_id,
495506
content_based_deduplication=content_based_deduplication,
496507
timeout=timeout,
508+
label=label,
497509
)
498510

499511
async def batch(

qstash/asyncio/schedule.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ async def create(
3535
schedule_id: Optional[str] = None,
3636
queue: Optional[str] = None,
3737
flow_control: Optional[FlowControl] = None,
38+
label: Optional[str] = None,
3839
) -> str:
3940
"""
4041
Creates a schedule to send messages periodically.
@@ -97,6 +98,7 @@ async def create(
9798
:param queue: Name of the queue which the scheduled messages will be enqueued.
9899
:param flow_control: Settings for controlling the number of active requests,
99100
as well as the rate of requests with the same flow control key.
101+
:param label: Assign a label to the request to filter logs with it later.
100102
"""
101103
req_headers = prepare_schedule_headers(
102104
cron=cron,
@@ -114,6 +116,7 @@ async def create(
114116
schedule_id=schedule_id,
115117
queue=queue,
116118
flow_control=flow_control,
119+
label=label,
117120
)
118121

119122
response = await self._http.request(
@@ -144,6 +147,7 @@ async def create_json(
144147
schedule_id: Optional[str] = None,
145148
queue: Optional[str] = None,
146149
flow_control: Optional[FlowControl] = None,
150+
label: Optional[str] = None,
147151
) -> str:
148152
"""
149153
Creates a schedule to send messages periodically, automatically serializing the
@@ -207,6 +211,7 @@ async def create_json(
207211
:param queue: Name of the queue which the scheduled messages will be enqueued.
208212
:param flow_control: Settings for controlling the number of active requests,
209213
as well as the rate of requests with the same flow control key.
214+
:param label: Assign a label to the request to filter logs with it later.
210215
"""
211216
return await self.create(
212217
destination=destination,
@@ -226,6 +231,7 @@ async def create_json(
226231
schedule_id=schedule_id,
227232
queue=queue,
228233
flow_control=flow_control,
234+
label=label,
229235
)
230236

231237
async def get(self, schedule_id: str) -> Schedule:

qstash/dlq.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ class DlqFilter(TypedDict, total=False):
6464
caller_ip: str
6565
"""Filter DLQ entries by IP address of the publisher of the message"""
6666

67+
label: str
68+
"""Filter DLQ entries by label."""
69+
6770

6871
@dataclasses.dataclass
6972
class ListDlqMessagesResponse:
@@ -108,6 +111,7 @@ def parse_dlq_message_response(
108111
response_body_base64=response.get("responseBodyBase64"),
109112
flow_control=flow_control,
110113
retry_delay_expression=response.get("retryDelayExpression"),
114+
label=response.get("label"),
111115
)
112116

113117

@@ -153,6 +157,9 @@ def prepare_list_dlq_messages_params(
153157
if "caller_ip" in filter:
154158
params["callerIp"] = filter["caller_ip"]
155159

160+
if "label" in filter:
161+
params["label"] = filter["label"]
162+
156163
return params
157164

158165

qstash/log.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ class Log:
120120
flow_control: Optional[FlowControlProperties]
121121
"""Flow control properties"""
122122

123+
label: Optional[str]
124+
"""Label assigned to the request for filtering logs."""
125+
123126

124127
class LogFilter(TypedDict, total=False):
125128
message_id: str
@@ -149,6 +152,9 @@ class LogFilter(TypedDict, total=False):
149152
to_time: int
150153
"""Filter logs by ending Unix time, in milliseconds"""
151154

155+
label: str
156+
"""Filter logs by label."""
157+
152158

153159
@dataclasses.dataclass
154160
class ListLogsResponse:
@@ -204,6 +210,9 @@ def prepare_list_logs_request_params(
204210
if "to_time" in filter:
205211
params["toDate"] = str(filter["to_time"])
206212

213+
if "label" in filter:
214+
params["label"] = filter["label"]
215+
207216
return params
208217

209218

@@ -238,6 +247,7 @@ def parse_logs_response(response: List[Dict[str, Any]]) -> List[Log]:
238247
method=event.get("method"),
239248
max_retries=event.get("maxRetries"),
240249
retry_delay_expression=event.get("retryDelayExpression"),
250+
label=event.get("label"),
241251
)
242252
)
243253

qstash/message.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,9 @@ class BatchRequest(TypedDict, total=False):
246246
the rate of requests with the same flow control key.
247247
"""
248248

249+
label: str
250+
"""Assign a label to the request to filter logs with it later."""
251+
249252

250253
class BatchJsonRequest(TypedDict, total=False):
251254
url: str
@@ -368,6 +371,9 @@ class BatchJsonRequest(TypedDict, total=False):
368371
the rate of requests with the same flow control key.
369372
"""
370373

374+
label: str
375+
"""Assign a label to the request to filter logs with it later."""
376+
371377

372378
@dataclasses.dataclass
373379
class Message:
@@ -437,6 +443,9 @@ class Message:
437443
flow_control: Optional[FlowControlProperties]
438444
"""Flow control properties."""
439445

446+
label: Optional[str]
447+
"""Label assigned to the request for filtering logs."""
448+
440449

441450
def get_destination(
442451
*,
@@ -488,6 +497,7 @@ def prepare_headers(
488497
content_based_deduplication: Optional[bool],
489498
timeout: Optional[Union[str, int]],
490499
flow_control: Optional[FlowControl],
500+
label: Optional[str],
491501
) -> Dict[str, str]:
492502
h = {}
493503

@@ -570,6 +580,9 @@ def prepare_headers(
570580
h["Upstash-Flow-Control-Key"] = flow_control["key"]
571581
h["Upstash-Flow-Control-Value"] = ", ".join(control_values)
572582

583+
if label is not None:
584+
h["Upstash-Label"] = label
585+
573586
return h
574587

575588

@@ -645,6 +658,7 @@ def prepare_batch_message_body(messages: List[BatchRequest]) -> str:
645658
content_based_deduplication=msg.get("content_based_deduplication"),
646659
timeout=msg.get("timeout"),
647660
flow_control=msg.get("flow_control"),
661+
label=msg.get("label"),
648662
)
649663

650664
batch_messages.append(
@@ -751,6 +765,9 @@ def convert_to_batch_messages(
751765
if "flow_control" in msg:
752766
batch_msg["flow_control"] = msg["flow_control"]
753767

768+
if "label" in msg:
769+
batch_msg["label"] = msg["label"]
770+
754771
batch_messages.append(batch_msg)
755772

756773
return batch_messages
@@ -791,6 +808,7 @@ def parse_message_response(response: Dict[str, Any]) -> Message:
791808
caller_ip=response.get("callerIP"),
792809
flow_control=flow_control,
793810
retry_delay_expression=response.get("retryDelayExpression"),
811+
label=response.get("label"),
794812
)
795813

796814

@@ -820,6 +838,7 @@ def publish(
820838
content_based_deduplication: Optional[bool] = None,
821839
timeout: Optional[Union[str, int]] = None,
822840
flow_control: Optional[FlowControl] = None,
841+
label: Optional[str] = None,
823842
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
824843
"""
825844
Publishes a message to QStash.
@@ -890,6 +909,7 @@ def publish(
890909
should be delivered with a shorter timeout.
891910
:param flow_control: Settings for controlling the number of active requests,
892911
as well as the rate of requests with the same flow control key.
912+
:param label: Assign a label to the request to filter logs with it later.
893913
"""
894914
headers = headers or {}
895915
destination = get_destination(
@@ -915,6 +935,7 @@ def publish(
915935
content_based_deduplication=content_based_deduplication,
916936
timeout=timeout,
917937
flow_control=flow_control,
938+
label=label,
918939
)
919940

920941
response = self._http.request(
@@ -947,6 +968,7 @@ def publish_json(
947968
content_based_deduplication: Optional[bool] = None,
948969
timeout: Optional[Union[str, int]] = None,
949970
flow_control: Optional[FlowControl] = None,
971+
label: Optional[str] = None,
950972
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
951973
"""
952974
Publish a message to QStash, automatically serializing the
@@ -1018,6 +1040,7 @@ def publish_json(
10181040
should be delivered with a shorter timeout.
10191041
:param flow_control: Settings for controlling the number of active requests,
10201042
as well as the rate of requests with the same flow control key.
1043+
:param label: Assign a label to the request to filter logs with it later.
10211044
"""
10221045
return self.publish(
10231046
url=url,
@@ -1039,6 +1062,7 @@ def publish_json(
10391062
content_based_deduplication=content_based_deduplication,
10401063
timeout=timeout,
10411064
flow_control=flow_control,
1065+
label=label,
10421066
)
10431067

10441068
def enqueue(
@@ -1061,6 +1085,7 @@ def enqueue(
10611085
deduplication_id: Optional[str] = None,
10621086
content_based_deduplication: Optional[bool] = None,
10631087
timeout: Optional[Union[str, int]] = None,
1088+
label: Optional[str] = None,
10641089
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
10651090
"""
10661091
Enqueues a message, after creating the queue if it does
@@ -1124,6 +1149,7 @@ def enqueue(
11241149
When a timeout is specified, it will be used instead of the maximum timeout
11251150
value permitted by the QStash plan. It is useful in scenarios, where a message
11261151
should be delivered with a shorter timeout.
1152+
:param label: Assign a label to the request to filter logs with it later.
11271153
"""
11281154
headers = headers or {}
11291155
destination = get_destination(
@@ -1149,6 +1175,7 @@ def enqueue(
11491175
content_based_deduplication=content_based_deduplication,
11501176
timeout=timeout,
11511177
flow_control=None,
1178+
label=label,
11521179
)
11531180

11541181
response = self._http.request(
@@ -1179,6 +1206,7 @@ def enqueue_json(
11791206
deduplication_id: Optional[str] = None,
11801207
content_based_deduplication: Optional[bool] = None,
11811208
timeout: Optional[Union[str, int]] = None,
1209+
label: Optional[str] = None,
11821210
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
11831211
"""
11841212
Enqueues a message, after creating the queue if it does
@@ -1243,6 +1271,7 @@ def enqueue_json(
12431271
When a timeout is specified, it will be used instead of the maximum timeout
12441272
value permitted by the QStash plan. It is useful in scenarios, where a message
12451273
should be delivered with a shorter timeout.
1274+
:param label: Assign a label to the request to filter logs with it later.
12461275
"""
12471276
return self.enqueue(
12481277
queue=queue,
@@ -1262,6 +1291,7 @@ def enqueue_json(
12621291
deduplication_id=deduplication_id,
12631292
content_based_deduplication=content_based_deduplication,
12641293
timeout=timeout,
1294+
label=label,
12651295
)
12661296

12671297
def batch(

0 commit comments

Comments
 (0)