Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# QStash Python SDK — Agent Instructions

## SDK Design Instructions

See `.github/instructions/qstash-sdk-design.instructions.md` for the canonical list of locations that must be updated when adding a new parameter.

## Checklist: Adding a New Parameter

When adding a new parameter to the SDK, follow this checklist **in full** before considering the work done.

### 1. Update all required locations

Refer to `.github/instructions/qstash-sdk-design.instructions.md` for the full list. The key rule: **every location listed there must be evaluated**. Not every parameter applies to every location (e.g. a request-only parameter won't appear in response dataclasses), but you must consciously check each one.

Common locations that get missed:

- **`qstash/asyncio/`** — The async variants (`asyncio/message.py`, `asyncio/schedule.py`) must mirror the sync API exactly. If you add a parameter to `MessageApi.publish`, you must also add it to `AsyncMessageApi.publish`, and so on for `publish_json`, `enqueue`, `enqueue_json`, `create`, `create_json`.
- **`convert_to_batch_messages`** in `qstash/message.py` — This function converts `BatchJsonRequest` to `BatchRequest`. If the parameter is in both TypedDicts, this function must copy it.
- **Response parsing** (`parse_schedule_response`, `parse_message_response`, `parse_logs_response`, `parse_dlq_message_response`) — If the API returns the parameter in responses, it must be added to the response dataclass and parse function.
- **Filter TypedDicts** (`LogFilter`, `DlqFilter`) and their param builders (`prepare_list_logs_request_params`, `prepare_list_dlq_messages_params`) — If the parameter is filterable.

### 2. Run all checks before submitting

Always run the full check suite **before** considering work complete:

```sh
poetry run pytest
poetry run ruff format .
poetry run ruff check .
poetry run mypy --show-error-codes .
```

mypy will catch missing arguments (like `[call-arg]` errors for missing named arguments). Running it would have caught the asyncio omissions immediately.

### 3. Verify asyncio parity

After making changes to any file in `qstash/`, diff the sync and async versions to confirm parity:

- `qstash/message.py` ↔ `qstash/asyncio/message.py`
- `qstash/schedule.py` ↔ `qstash/asyncio/schedule.py`
9 changes: 9 additions & 0 deletions qstash/asyncio/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from qstash.message import (
ApiT,
FlowControl,
Redact,
BatchJsonRequest,
BatchRequest,
BatchResponse,
Expand Down Expand Up @@ -53,6 +54,7 @@ async def publish(
timeout: Optional[Union[str, int]] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
"""
Publishes a message to QStash.
Expand Down Expand Up @@ -150,6 +152,7 @@ async def publish(
timeout=timeout,
flow_control=flow_control,
label=label,
redact=redact,
)

response = await self._http.request(
Expand Down Expand Up @@ -183,6 +186,7 @@ async def publish_json(
timeout: Optional[Union[str, int]] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
"""
Publish a message to QStash, automatically serializing the
Expand Down Expand Up @@ -277,6 +281,7 @@ async def publish_json(
timeout=timeout,
flow_control=flow_control,
label=label,
redact=redact,
)

async def enqueue(
Expand All @@ -300,6 +305,7 @@ async def enqueue(
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
"""
Enqueues a message, after creating the queue if it does
Expand Down Expand Up @@ -390,6 +396,7 @@ async def enqueue(
timeout=timeout,
flow_control=None,
label=label,
redact=redact,
)

response = await self._http.request(
Expand Down Expand Up @@ -421,6 +428,7 @@ async def enqueue_json(
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
"""
Enqueues a message, after creating the queue if it does
Expand Down Expand Up @@ -506,6 +514,7 @@ async def enqueue_json(
content_based_deduplication=content_based_deduplication,
timeout=timeout,
label=label,
redact=redact,
)

async def batch(
Expand Down
6 changes: 5 additions & 1 deletion qstash/asyncio/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from qstash.asyncio.http import AsyncHttpClient
from qstash.http import HttpMethod
from qstash.message import FlowControl
from qstash.message import FlowControl, Redact
from qstash.schedule import (
Schedule,
parse_schedule_response,
Expand Down Expand Up @@ -36,6 +36,7 @@ async def create(
queue: Optional[str] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> str:
"""
Creates a schedule to send messages periodically.
Expand Down Expand Up @@ -117,6 +118,7 @@ async def create(
queue=queue,
flow_control=flow_control,
label=label,
redact=redact,
)

response = await self._http.request(
Expand Down Expand Up @@ -148,6 +150,7 @@ async def create_json(
queue: Optional[str] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> str:
"""
Creates a schedule to send messages periodically, automatically serializing the
Expand Down Expand Up @@ -232,6 +235,7 @@ async def create_json(
queue=queue,
flow_control=flow_control,
label=label,
redact=redact,
)

async def get(self, schedule_id: str) -> Schedule:
Expand Down
82 changes: 78 additions & 4 deletions qstash/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,27 @@ class FlowControl(TypedDict, total=False):

period: Union[str, int]
"""
Unit duration of the rate.
Unit duration of the rate.
When given as an integer, it is in seconds. Otherwise, it
can be specified as a duration string like '10s'(10 seconds),
'2m'(2 minutes), '1h'(1 hour),
or '3d5h12m'(3 days and 5 hours and 12 minutes).
can be specified as a duration string like '10s'(10 seconds),
'2m'(2 minutes), '1h'(1 hour),
or '3d5h12m'(3 days and 5 hours and 12 minutes).
Can be at most a week and defaults to 1 second.
"""


class Redact(TypedDict, total=False):
body: Literal[True]
"""Redact the request body in logs."""

header: Union[Literal[True], List[str]]
"""
Redact headers in logs.
- `True` to redact all headers
- List of header names to redact specific headers (e.g., ["Authorization"])
"""


@dataclasses.dataclass
class FlowControlProperties:
key: str
Expand Down Expand Up @@ -249,6 +261,14 @@ class BatchRequest(TypedDict, total=False):
label: str
"""Assign a label to the request to filter logs with it later."""

redact: Redact
"""
Configure which fields should be redacted in logs.
- `{"body": True}` to redact the request body
- `{"header": True}` to redact all headers
- `{"header": ["Authorization"]}` to redact specific headers
"""


class BatchJsonRequest(TypedDict, total=False):
url: str
Expand Down Expand Up @@ -374,6 +394,14 @@ class BatchJsonRequest(TypedDict, total=False):
label: str
"""Assign a label to the request to filter logs with it later."""

redact: Redact
"""
Configure which fields should be redacted in logs.
- `{"body": True}` to redact the request body
- `{"header": True}` to redact all headers
- `{"header": ["Authorization"]}` to redact specific headers
"""


@dataclasses.dataclass
class Message:
Expand Down Expand Up @@ -498,6 +526,7 @@ def prepare_headers(
timeout: Optional[Union[str, int]],
flow_control: Optional[FlowControl],
label: Optional[str],
redact: Optional[Redact],
) -> Dict[str, str]:
h = {}

Expand Down Expand Up @@ -583,6 +612,23 @@ def prepare_headers(
if label is not None:
h["Upstash-Label"] = label

if redact is not None:
redact_parts = []

if redact.get("body"):
redact_parts.append("body")

header_redact = redact.get("header")
if header_redact is not None:
if header_redact is True:
redact_parts.append("header")
elif isinstance(header_redact, list) and len(header_redact) > 0:
for header_name in header_redact:
redact_parts.append(f"header[{header_name}]")

if redact_parts:
h["Upstash-Redact-Fields"] = ",".join(redact_parts)

return h


Expand Down Expand Up @@ -659,6 +705,7 @@ def prepare_batch_message_body(messages: List[BatchRequest]) -> str:
timeout=msg.get("timeout"),
flow_control=msg.get("flow_control"),
label=msg.get("label"),
redact=msg.get("redact"),
)

batch_messages.append(
Expand Down Expand Up @@ -768,6 +815,9 @@ def convert_to_batch_messages(
if "label" in msg:
batch_msg["label"] = msg["label"]

if "redact" in msg:
batch_msg["redact"] = msg["redact"]

batch_messages.append(batch_msg)

return batch_messages
Expand Down Expand Up @@ -839,6 +889,7 @@ def publish(
timeout: Optional[Union[str, int]] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
"""
Publishes a message to QStash.
Expand Down Expand Up @@ -910,6 +961,10 @@ def publish(
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
:param redact: Configure which fields should be redacted in logs.
- `{"body": True}` to redact the request body
- `{"header": True}` to redact all headers
- `{"header": ["Authorization"]}` to redact specific headers
"""
headers = headers or {}
destination = get_destination(
Expand All @@ -936,6 +991,7 @@ def publish(
timeout=timeout,
flow_control=flow_control,
label=label,
redact=redact,
)

response = self._http.request(
Expand Down Expand Up @@ -969,6 +1025,7 @@ def publish_json(
timeout: Optional[Union[str, int]] = None,
flow_control: Optional[FlowControl] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> Union[PublishResponse, List[PublishUrlGroupResponse]]:
"""
Publish a message to QStash, automatically serializing the
Expand Down Expand Up @@ -1041,6 +1098,10 @@ def publish_json(
:param flow_control: Settings for controlling the number of active requests,
as well as the rate of requests with the same flow control key.
:param label: Assign a label to the request to filter logs with it later.
:param redact: Configure which fields should be redacted in logs.
- `{"body": True}` to redact the request body
- `{"header": True}` to redact all headers
- `{"header": ["Authorization"]}` to redact specific headers
"""
return self.publish(
url=url,
Expand All @@ -1063,6 +1124,7 @@ def publish_json(
timeout=timeout,
flow_control=flow_control,
label=label,
redact=redact,
)

def enqueue(
Expand All @@ -1086,6 +1148,7 @@ def enqueue(
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
"""
Enqueues a message, after creating the queue if it does
Expand Down Expand Up @@ -1150,6 +1213,10 @@ def enqueue(
value permitted by the QStash plan. It is useful in scenarios, where a message
should be delivered with a shorter timeout.
:param label: Assign a label to the request to filter logs with it later.
:param redact: Configure which fields should be redacted in logs.
- `{"body": True}` to redact the request body
- `{"header": True}` to redact all headers
- `{"header": ["Authorization"]}` to redact specific headers
"""
headers = headers or {}
destination = get_destination(
Expand All @@ -1176,6 +1243,7 @@ def enqueue(
timeout=timeout,
flow_control=None,
label=label,
redact=redact,
)

response = self._http.request(
Expand Down Expand Up @@ -1207,6 +1275,7 @@ def enqueue_json(
content_based_deduplication: Optional[bool] = None,
timeout: Optional[Union[str, int]] = None,
label: Optional[str] = None,
redact: Optional[Redact] = None,
) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]:
"""
Enqueues a message, after creating the queue if it does
Expand Down Expand Up @@ -1272,6 +1341,10 @@ def enqueue_json(
value permitted by the QStash plan. It is useful in scenarios, where a message
should be delivered with a shorter timeout.
:param label: Assign a label to the request to filter logs with it later.
:param redact: Configure which fields should be redacted in logs.
- `{"body": True}` to redact the request body
- `{"header": True}` to redact all headers
- `{"header": ["Authorization"]}` to redact specific headers
"""
return self.enqueue(
queue=queue,
Expand All @@ -1292,6 +1365,7 @@ def enqueue_json(
content_based_deduplication=content_based_deduplication,
timeout=timeout,
label=label,
redact=redact,
)

def batch(
Expand Down
Loading
Loading