Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "qstash"
version = "3.0.0"
version = "3.1.0"
description = "Python SDK for Upstash QStash"
license = "MIT"
authors = ["Upstash <support@upstash.com>"]
Expand Down
2 changes: 1 addition & 1 deletion qstash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
from qstash.client import QStash
from qstash.receiver import Receiver

__version__ = "3.0.0"
__version__ = "3.1.0"
__all__ = ["QStash", "AsyncQStash", "Receiver"]
120 changes: 120 additions & 0 deletions qstash/asyncio/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async def publish(
callback_headers: Optional[Dict[str, str]] = None,
failure_callback_headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
retry_delay: Optional[str] = None,
callback: Optional[str] = None,
failure_callback: Optional[str] = None,
delay: Optional[Union[str, int]] = None,
Expand Down Expand Up @@ -74,6 +75,34 @@ async def publish(
callback message.
:param retries: How often should this message be retried in case the destination
API is not available.
:param retry_delay: Delay between retries.

By default, the `retryDelay` is exponential backoff.
More details can be found in: https://upstash.com/docs/qstash/features/retry.

The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.

You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
The special variable `retried` represents the current retry attempt count (starting from 0).

Supported functions:
- `pow`
- `sqrt`
- `abs`
- `exp`
- `floor`
- `ceil`
- `round`
- `min`
- `max`

Examples of valid `retryDelay` values:
```py
1000 # 1 second
1000 * (1 + retried) # 1 second multiplied by the current retry attempt
pow(2, retried) # 2 to the power of the current retry attempt
max(10, pow(2, retried)) # The greater of 10 or 2^retried
```
:param callback: A callback url that will be called after each attempt.
:param failure_callback: A failure callback url that will be called when a delivery
is failed, that is when all the defined retries are exhausted.
Expand Down Expand Up @@ -109,6 +138,7 @@ async def publish(
callback_headers=callback_headers,
failure_callback_headers=failure_callback_headers,
retries=retries,
retry_delay=retry_delay,
callback=callback,
failure_callback=failure_callback,
delay=delay,
Expand Down Expand Up @@ -140,6 +170,7 @@ async def publish_json(
callback_headers: Optional[Dict[str, str]] = None,
failure_callback_headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
retry_delay: Optional[str] = None,
callback: Optional[str] = None,
failure_callback: Optional[str] = None,
delay: Optional[Union[str, int]] = None,
Expand Down Expand Up @@ -172,6 +203,34 @@ async def publish_json(
callback message.
:param retries: How often should this message be retried in case the destination
API is not available.
:param retry_delay: Delay between retries.

By default, the `retryDelay` is exponential backoff.
More details can be found in: https://upstash.com/docs/qstash/features/retry.

The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.

You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
The special variable `retried` represents the current retry attempt count (starting from 0).

Supported functions:
- `pow`
- `sqrt`
- `abs`
- `exp`
- `floor`
- `ceil`
- `round`
- `min`
- `max`

Examples of valid `retryDelay` values:
```py
1000 # 1 second
1000 * (1 + retried) # 1 second multiplied by the current retry attempt
pow(2, retried) # 2 to the power of the current retry attempt
max(10, pow(2, retried)) # The greater of 10 or 2^retried
```
:param callback: A callback url that will be called after each attempt.
:param failure_callback: A failure callback url that will be called when a delivery
is failed, that is when all the defined retries are exhausted.
Expand Down Expand Up @@ -203,6 +262,7 @@ async def publish_json(
callback_headers=callback_headers,
failure_callback_headers=failure_callback_headers,
retries=retries,
retry_delay=retry_delay,
callback=callback,
failure_callback=failure_callback,
delay=delay,
Expand All @@ -227,6 +287,7 @@ async def enqueue(
callback_headers: Optional[Dict[str, str]] = None,
failure_callback_headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
retry_delay: Optional[str] = None,
callback: Optional[str] = None,
failure_callback: Optional[str] = None,
deduplication_id: Optional[str] = None,
Expand Down Expand Up @@ -257,6 +318,34 @@ async def enqueue(
callback message.
:param retries: How often should this message be retried in case the destination
API is not available.
:param retry_delay: Delay between retries.

By default, the `retryDelay` is exponential backoff.
More details can be found in: https://upstash.com/docs/qstash/features/retry.

The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.

You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
The special variable `retried` represents the current retry attempt count (starting from 0).

Supported functions:
- `pow`
- `sqrt`
- `abs`
- `exp`
- `floor`
- `ceil`
- `round`
- `min`
- `max`

Examples of valid `retryDelay` values:
```py
1000 # 1 second
1000 * (1 + retried) # 1 second multiplied by the current retry attempt
pow(2, retried) # 2 to the power of the current retry attempt
max(10, pow(2, retried)) # The greater of 10 or 2^retried
```
:param callback: A callback url that will be called after each attempt.
:param failure_callback: A failure callback url that will be called when a delivery
is failed, that is when all the defined retries are exhausted.
Expand All @@ -283,6 +372,7 @@ async def enqueue(
callback_headers=callback_headers,
failure_callback_headers=failure_callback_headers,
retries=retries,
retry_delay=retry_delay,
callback=callback,
failure_callback=failure_callback,
delay=None,
Expand Down Expand Up @@ -315,6 +405,7 @@ async def enqueue_json(
callback_headers: Optional[Dict[str, str]] = None,
failure_callback_headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
retry_delay: Optional[str] = None,
callback: Optional[str] = None,
failure_callback: Optional[str] = None,
deduplication_id: Optional[str] = None,
Expand Down Expand Up @@ -346,6 +437,34 @@ async def enqueue_json(
callback message.
:param retries: How often should this message be retried in case the destination
API is not available.
:param retry_delay: Delay between retries.

By default, the `retryDelay` is exponential backoff.
More details can be found in: https://upstash.com/docs/qstash/features/retry.

The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.

You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
The special variable `retried` represents the current retry attempt count (starting from 0).

Supported functions:
- `pow`
- `sqrt`
- `abs`
- `exp`
- `floor`
- `ceil`
- `round`
- `min`
- `max`

Examples of valid `retryDelay` values:
```py
1000 # 1 second
1000 * (1 + retried) # 1 second multiplied by the current retry attempt
pow(2, retried) # 2 to the power of the current retry attempt
max(10, pow(2, retried)) # The greater of 10 or 2^retried
```
:param callback: A callback url that will be called after each attempt.
:param failure_callback: A failure callback url that will be called when a delivery
is failed, that is when all the defined retries are exhausted.
Expand All @@ -369,6 +488,7 @@ async def enqueue_json(
callback_headers=callback_headers,
failure_callback_headers=failure_callback_headers,
retries=retries,
retry_delay=retry_delay,
callback=callback,
failure_callback=failure_callback,
deduplication_id=deduplication_id,
Expand Down
60 changes: 60 additions & 0 deletions qstash/asyncio/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async def create(
callback_headers: Optional[Dict[str, str]] = None,
failure_callback_headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
retry_delay: Optional[str] = None,
callback: Optional[str] = None,
failure_callback: Optional[str] = None,
delay: Optional[Union[str, int]] = None,
Expand All @@ -51,6 +52,34 @@ async def create(
callback message.
:param retries: How often should this message be retried in case the destination
API is not available.
:param retry_delay: Delay between retries.

By default, the `retryDelay` is exponential backoff.
More details can be found in: https://upstash.com/docs/qstash/features/retry.

The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.

You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
The special variable `retried` represents the current retry attempt count (starting from 0).

Supported functions:
- `pow`
- `sqrt`
- `abs`
- `exp`
- `floor`
- `ceil`
- `round`
- `min`
- `max`

Examples of valid `retryDelay` values:
```py
1000 # 1 second
1000 * (1 + retried) # 1 second multiplied by the current retry attempt
pow(2, retried) # 2 to the power of the current retry attempt
max(10, pow(2, retried)) # The greater of 10 or 2^retried
```
:param callback: A callback url that will be called after each attempt.
:param failure_callback: A failure callback url that will be called when a delivery
is failed, that is when all the defined retries are exhausted.
Expand All @@ -77,6 +106,7 @@ async def create(
callback_headers=callback_headers,
failure_callback_headers=failure_callback_headers,
retries=retries,
retry_delay=retry_delay,
callback=callback,
failure_callback=failure_callback,
delay=delay,
Expand Down Expand Up @@ -106,6 +136,7 @@ async def create_json(
callback_headers: Optional[Dict[str, str]] = None,
failure_callback_headers: Optional[Dict[str, str]] = None,
retries: Optional[int] = None,
retry_delay: Optional[str] = None,
callback: Optional[str] = None,
failure_callback: Optional[str] = None,
delay: Optional[Union[str, int]] = None,
Expand All @@ -131,6 +162,34 @@ async def create_json(
callback message.
:param retries: How often should this message be retried in case the destination
API is not available.
:param retry_delay: Delay between retries.

By default, the `retryDelay` is exponential backoff.
More details can be found in: https://upstash.com/docs/qstash/features/retry.

The `retryDelay` option allows you to customize the delay (in milliseconds) between retry attempts when message delivery fails.

You can use mathematical expressions and the following built-in functions to calculate the delay dynamically.
The special variable `retried` represents the current retry attempt count (starting from 0).

Supported functions:
- `pow`
- `sqrt`
- `abs`
- `exp`
- `floor`
- `ceil`
- `round`
- `min`
- `max`

Examples of valid `retryDelay` values:
```py
1000 # 1 second
1000 * (1 + retried) # 1 second multiplied by the current retry attempt
pow(2, retried) # 2 to the power of the current retry attempt
max(10, pow(2, retried)) # The greater of 10 or 2^retried
```
:param callback: A callback url that will be called after each attempt.
:param failure_callback: A failure callback url that will be called when a delivery
is failed, that is when all the defined retries are exhausted.
Expand Down Expand Up @@ -159,6 +218,7 @@ async def create_json(
callback_headers=callback_headers,
failure_callback_headers=failure_callback_headers,
retries=retries,
retry_delay=retry_delay,
callback=callback,
failure_callback=failure_callback,
delay=delay,
Expand Down
7 changes: 7 additions & 0 deletions qstash/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ class DlqMessage(Message):
if the response body contains non-UTF-8 characters.
"""

retry_delay_expression: Optional[str]
"""
The retry delay expression for this DLQ message,
if retry_delay was set when publishing the message.
"""


class DlqFilter(TypedDict, total=False):
message_id: str
Expand Down Expand Up @@ -101,6 +107,7 @@ def parse_dlq_message_response(
response_body=response.get("responseBody"),
response_body_base64=response.get("responseBodyBase64"),
flow_control=flow_control,
retry_delay_expression=response.get("retryDelayExpression"),
)


Expand Down
7 changes: 7 additions & 0 deletions qstash/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ class Log:
max_retries: Optional[int]
"""Number of retries that should be attempted in case of delivery failure."""

retry_delay_expression: Optional[str]
"""
The retry delay expression for this DLQ message,
if retry_delay was set when publishing the message.
"""

flow_control: Optional[FlowControlProperties]
"""Flow control properties"""

Expand Down Expand Up @@ -231,6 +237,7 @@ def parse_logs_response(response: List[Dict[str, Any]]) -> List[Log]:
flow_control=flow_control,
method=event.get("method"),
max_retries=event.get("maxRetries"),
retry_delay_expression=event.get("retryDelayExpression"),
)
)

Expand Down
Loading
Loading