Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions qstash/asyncio/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async def publish(
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
"""
Publishes a message to QStash.
Expand Down Expand Up @@ -122,6 +123,7 @@ async def publish(
should be delivered with a shorter timeout.
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
"""
headers = headers or {}
destination = get_destination(
Expand All @@ -147,6 +149,7 @@ async def publish(
content_based_deduplication=content_based_deduplication,
timeout=timeout,
flow_control=flow_control,
label=label,
)

response = await self._http.request(
Expand Down Expand Up @@ -179,6 +182,7 @@ async def publish_json(
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
"""
Publish a message to QStash, automatically serializing the
Expand Down Expand Up @@ -250,6 +254,7 @@ async def publish_json(
should be delivered with a shorter timeout.
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
"""
return await self.publish(
url=url,
Expand All @@ -271,6 +276,7 @@ async def publish_json(
content_based_deduplication=content_based_deduplication,
timeout=timeout,
flow_control=flow_control,
label=label,
)

async def enqueue(
Expand All @@ -293,6 +299,7 @@ async def enqueue(
deduplication_id: Optional[str] = None,
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
label: Optional[str] = None,
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
"""
Enqueues a message, after creating the queue if it does
Expand Down Expand Up @@ -356,6 +363,7 @@ async def enqueue(
When a timeout is specified, it will be used instead of the maximum timeout
value permitted by the QStash plan. It is useful in scenarios, where a message
should be delivered with a shorter timeout.
:param label: Assign a label to the request to filter logs with it later.
"""
headers = headers or {}
destination = get_destination(
Expand All @@ -381,6 +389,7 @@ async def enqueue(
content_based_deduplication=content_based_deduplication,
timeout=timeout,
flow_control=None,
label=label,
)

response = await self._http.request(
Expand Down Expand Up @@ -411,6 +420,7 @@ async def enqueue_json(
deduplication_id: Optional[str] = None,
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
label: Optional[str] = None,
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
"""
Enqueues a message, after creating the queue if it does
Expand Down Expand Up @@ -475,6 +485,7 @@ async def enqueue_json(
When a timeout is specified, it will be used instead of the maximum timeout
value permitted by the QStash plan. It is useful in scenarios, where a message
should be delivered with a shorter timeout.
:param label: Assign a label to the request to filter logs with it later.
"""
return await self.enqueue(
queue=queue,
Expand All @@ -494,6 +505,7 @@ async def enqueue_json(
deduplication_id=deduplication_id,
content_based_deduplication=content_based_deduplication,
timeout=timeout,
label=label,
)

async def batch(
Expand Down
6 changes: 6 additions & 0 deletions qstash/asyncio/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async def create(
schedule_id: Optional[str] = None,
queue: Optional[str] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
) -> str:
"""
Creates a schedule to send messages periodically.
Expand Down Expand Up @@ -97,6 +98,7 @@ async def create(
:param queue: Name of the queue which the scheduled messages will be enqueued.
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
"""
req_headers = prepare_schedule_headers(
cron=cron,
Expand All @@ -114,6 +116,7 @@ async def create(
schedule_id=schedule_id,
queue=queue,
flow_control=flow_control,
label=label,
)

response = await self._http.request(
Expand Down Expand Up @@ -144,6 +147,7 @@ async def create_json(
schedule_id: Optional[str] = None,
queue: Optional[str] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
) -> str:
"""
Creates a schedule to send messages periodically, automatically serializing the
Expand Down Expand Up @@ -207,6 +211,7 @@ async def create_json(
:param queue: Name of the queue which the scheduled messages will be enqueued.
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
"""
return await self.create(
destination=destination,
Expand All @@ -226,6 +231,7 @@ async def create_json(
schedule_id=schedule_id,
queue=queue,
flow_control=flow_control,
label=label,
)

async def get(self, schedule_id: str) -> Schedule:
Expand Down
7 changes: 7 additions & 0 deletions qstash/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class DlqFilter(TypedDict, total=False):
caller_ip: str
"""Filter DLQ entries by IP address of the publisher of the message"""

label: str
"""Filter DLQ entries by label."""


@dataclasses.dataclass
class ListDlqMessagesResponse:
Expand Down Expand Up @@ -108,6 +111,7 @@ def parse_dlq_message_response(
response_body_base64=response.get("responseBodyBase64"),
flow_control=flow_control,
retry_delay_expression=response.get("retryDelayExpression"),
label=response.get("label"),
)


Expand Down Expand Up @@ -153,6 +157,9 @@ def prepare_list_dlq_messages_params(
if "caller_ip" in filter:
params["callerIp"] = filter["caller_ip"]

if "label" in filter:
params["label"] = filter["label"]

return params


Expand Down
10 changes: 10 additions & 0 deletions qstash/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class Log:
flow_control: Optional[FlowControlProperties]
"""Flow control properties"""

label: Optional[str]
"""Label assigned to the request for filtering logs."""


class LogFilter(TypedDict, total=False):
message_id: str
Expand Down Expand Up @@ -149,6 +152,9 @@ class LogFilter(TypedDict, total=False):
to_time: int
"""Filter logs by ending Unix time, in milliseconds"""

label: str
"""Filter logs by label."""


@dataclasses.dataclass
class ListLogsResponse:
Expand Down Expand Up @@ -204,6 +210,9 @@ def prepare_list_logs_request_params(
if "to_time" in filter:
params["toDate"] = str(filter["to_time"])

if "label" in filter:
params["label"] = filter["label"]

return params


Expand Down Expand Up @@ -238,6 +247,7 @@ def parse_logs_response(response: List[Dict[str, Any]]) -> List[Log]:
method=event.get("method"),
max_retries=event.get("maxRetries"),
retry_delay_expression=event.get("retryDelayExpression"),
label=event.get("label"),
)
)

Expand Down
30 changes: 30 additions & 0 deletions qstash/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ class BatchRequest(TypedDict, total=False):
the rate of requests with the same flow control key.
"""

label: str
"""Assign a label to the request to filter logs with it later."""


class BatchJsonRequest(TypedDict, total=False):
url: str
Expand Down Expand Up @@ -368,6 +371,9 @@ class BatchJsonRequest(TypedDict, total=False):
the rate of requests with the same flow control key.
"""

label: str
"""Assign a label to the request to filter logs with it later."""


@dataclasses.dataclass
class Message:
Expand Down Expand Up @@ -437,6 +443,9 @@ class Message:
flow_control: Optional[FlowControlProperties]
"""Flow control properties."""

label: Optional[str]
"""Label assigned to the request for filtering logs."""


def get_destination(
*,
Expand Down Expand Up @@ -488,6 +497,7 @@ def prepare_headers(
content_based_deduplication: Optional[bool],
timeout: Optional[Union[str, int]],
flow_control: Optional[FlowControl],
label: Optional[str],
) -> Dict[str, str]:
h = {}

Expand Down Expand Up @@ -570,6 +580,9 @@ def prepare_headers(
h["Upstash-Flow-Control-Key"] = flow_control["key"]
h["Upstash-Flow-Control-Value"] = ", ".join(control_values)

if label is not None:
h["Upstash-Label"] = label

return h


Expand Down Expand Up @@ -645,6 +658,7 @@ def prepare_batch_message_body(messages: List[BatchRequest]) -> str:
content_based_deduplication=msg.get("content_based_deduplication"),
timeout=msg.get("timeout"),
flow_control=msg.get("flow_control"),
label=msg.get("label"),
)

batch_messages.append(
Expand Down Expand Up @@ -751,6 +765,9 @@ def convert_to_batch_messages(
if "flow_control" in msg:
batch_msg["flow_control"] = msg["flow_control"]

if "label" in msg:
batch_msg["label"] = msg["label"]

batch_messages.append(batch_msg)

return batch_messages
Expand Down Expand Up @@ -791,6 +808,7 @@ def parse_message_response(response: Dict[str, Any]) -> Message:
caller_ip=response.get("callerIP"),
flow_control=flow_control,
retry_delay_expression=response.get("retryDelayExpression"),
label=response.get("label"),
)


Expand Down Expand Up @@ -820,6 +838,7 @@ def publish(
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
"""
Publishes a message to QStash.
Expand Down Expand Up @@ -890,6 +909,7 @@ def publish(
should be delivered with a shorter timeout.
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
"""
headers = headers or {}
destination = get_destination(
Expand All @@ -915,6 +935,7 @@ def publish(
content_based_deduplication=content_based_deduplication,
timeout=timeout,
flow_control=flow_control,
label=label,
)

response = self._http.request(
Expand Down Expand Up @@ -947,6 +968,7 @@ def publish_json(
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
"""
Publish a message to QStash, automatically serializing the
Expand Down Expand Up @@ -1018,6 +1040,7 @@ def publish_json(
should be delivered with a shorter timeout.
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
"""
return self.publish(
url=url,
Expand All @@ -1039,6 +1062,7 @@ def publish_json(
content_based_deduplication=content_based_deduplication,
timeout=timeout,
flow_control=flow_control,
label=label,
)

def enqueue(
Expand All @@ -1061,6 +1085,7 @@ def enqueue(
deduplication_id: Optional[str] = None,
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
label: Optional[str] = None,
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
"""
Enqueues a message, after creating the queue if it does
Expand Down Expand Up @@ -1124,6 +1149,7 @@ def enqueue(
When a timeout is specified, it will be used instead of the maximum timeout
value permitted by the QStash plan. It is useful in scenarios, where a message
should be delivered with a shorter timeout.
:param label: Assign a label to the request to filter logs with it later.
"""
headers = headers or {}
destination = get_destination(
Expand All @@ -1149,6 +1175,7 @@ def enqueue(
content_based_deduplication=content_based_deduplication,
timeout=timeout,
flow_control=None,
label=label,
)

response = self._http.request(
Expand Down Expand Up @@ -1179,6 +1206,7 @@ def enqueue_json(
deduplication_id: Optional[str] = None,
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
label: Optional[str] = None,
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
"""
Enqueues a message, after creating the queue if it does
Expand Down Expand Up @@ -1243,6 +1271,7 @@ def enqueue_json(
When a timeout is specified, it will be used instead of the maximum timeout
value permitted by the QStash plan. It is useful in scenarios, where a message
should be delivered with a shorter timeout.
:param label: Assign a label to the request to filter logs with it later.
"""
return self.enqueue(
queue=queue,
Expand All @@ -1262,6 +1291,7 @@ def enqueue_json(
deduplication_id=deduplication_id,
content_based_deduplication=content_based_deduplication,
timeout=timeout,
label=label,
)

def batch(
Expand Down
Loading
Loading