From 52e5a474f3868834a38107b543004deddfe6c527 Mon Sep 17 00:00:00 2001 From: Upstash Box Date: Wed, 18 Mar 2026 09:26:01 +0000 Subject: [PATCH 1/4] UPS-9: update qstash-py --- qstash/message.py | 79 +++++++++++++++++++++++++++++++++++++++++++--- qstash/schedule.py | 32 ++++++++++++++++++- 2 files changed, 106 insertions(+), 5 deletions(-) diff --git a/qstash/message.py b/qstash/message.py index 4ba9a01..18fe1ec 100644 --- a/qstash/message.py +++ b/qstash/message.py @@ -38,15 +38,27 @@ class FlowControl(TypedDict, total=False): period: Union[str, int] """ - Unit duration of the rate. + Unit duration of the rate. When given as an integer, it is in seconds. Otherwise, it - can be specified as a duration string like '10s'(10 seconds), - '2m'(2 minutes), '1h'(1 hour), - or '3d5h12m'(3 days and 5 hours and 12 minutes). + can be specified as a duration string like '10s'(10 seconds), + '2m'(2 minutes), '1h'(1 hour), + or '3d5h12m'(3 days and 5 hours and 12 minutes). Can be at most a week and defaults to 1 second. """ +class Redact(TypedDict, total=False): + body: Literal[True] + """Redact the request body in logs.""" + + header: Union[Literal[True], List[str]] + """ + Redact headers in logs. + - `True` to redact all headers + - List of header names to redact specific headers (e.g., ["Authorization"]) + """ + + @dataclasses.dataclass class FlowControlProperties: key: str @@ -249,6 +261,14 @@ class BatchRequest(TypedDict, total=False): label: str """Assign a label to the request to filter logs with it later.""" + redact: Redact + """ + Configure which fields should be redacted in logs. + - `{"body": True}` to redact the request body + - `{"header": True}` to redact all headers + - `{"header": ["Authorization"]}` to redact specific headers + """ + class BatchJsonRequest(TypedDict, total=False): url: str @@ -374,6 +394,14 @@ class BatchJsonRequest(TypedDict, total=False): label: str """Assign a label to the request to filter logs with it later.""" + redact: Redact + """ + Configure which fields should be redacted in logs. + - `{"body": True}` to redact the request body + - `{"header": True}` to redact all headers + - `{"header": ["Authorization"]}` to redact specific headers + """ + @dataclasses.dataclass class Message: @@ -498,6 +526,7 @@ def prepare_headers( timeout: Optional[Union[str, int]], flow_control: Optional[FlowControl], label: Optional[str], + redact: Optional[Redact], ) -> Dict[str, str]: h = {} @@ -583,6 +612,23 @@ def prepare_headers( if label is not None: h["Upstash-Label"] = label + if redact is not None: + redact_parts = [] + + if redact.get("body"): + redact_parts.append("body") + + header_redact = redact.get("header") + if header_redact is not None: + if header_redact is True: + redact_parts.append("header") + elif isinstance(header_redact, list) and len(header_redact) > 0: + for header_name in header_redact: + redact_parts.append(f"header[{header_name}]") + + if redact_parts: + h["Upstash-Redact-Fields"] = ",".join(redact_parts) + return h @@ -659,6 +705,7 @@ def prepare_batch_message_body(messages: List[BatchRequest]) -> str: timeout=msg.get("timeout"), flow_control=msg.get("flow_control"), label=msg.get("label"), + redact=msg.get("redact"), ) batch_messages.append( @@ -839,6 +886,7 @@ def publish( timeout: Optional[Union[str, int]] = None, flow_control: Optional[FlowControl] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Union[PublishResponse, List[PublishUrlGroupResponse]]: """ Publishes a message to QStash. @@ -910,6 +958,10 @@ def publish( :param flow_control: Settings for controlling the number of active requests, as well as the rate of requests with the same flow control key. :param label: Assign a label to the request to filter logs with it later. + :param redact: Configure which fields should be redacted in logs. + - `{"body": True}` to redact the request body + - `{"header": True}` to redact all headers + - `{"header": ["Authorization"]}` to redact specific headers """ headers = headers or {} destination = get_destination( @@ -936,6 +988,7 @@ def publish( timeout=timeout, flow_control=flow_control, label=label, + redact=redact, ) response = self._http.request( @@ -969,6 +1022,7 @@ def publish_json( timeout: Optional[Union[str, int]] = None, flow_control: Optional[FlowControl] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Union[PublishResponse, List[PublishUrlGroupResponse]]: """ Publish a message to QStash, automatically serializing the @@ -1041,6 +1095,10 @@ def publish_json( :param flow_control: Settings for controlling the number of active requests, as well as the rate of requests with the same flow control key. :param label: Assign a label to the request to filter logs with it later. + :param redact: Configure which fields should be redacted in logs. + - `{"body": True}` to redact the request body + - `{"header": True}` to redact all headers + - `{"header": ["Authorization"]}` to redact specific headers """ return self.publish( url=url, @@ -1063,6 +1121,7 @@ def publish_json( timeout=timeout, flow_control=flow_control, label=label, + redact=redact, ) def enqueue( @@ -1086,6 +1145,7 @@ def enqueue( content_based_deduplication: Optional[bool] = None, timeout: Optional[Union[str, int]] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]: """ Enqueues a message, after creating the queue if it does @@ -1150,6 +1210,10 @@ def enqueue( value permitted by the QStash plan. It is useful in scenarios, where a message should be delivered with a shorter timeout. :param label: Assign a label to the request to filter logs with it later. + :param redact: Configure which fields should be redacted in logs. + - `{"body": True}` to redact the request body + - `{"header": True}` to redact all headers + - `{"header": ["Authorization"]}` to redact specific headers """ headers = headers or {} destination = get_destination( @@ -1176,6 +1240,7 @@ def enqueue( timeout=timeout, flow_control=None, label=label, + redact=redact, ) response = self._http.request( @@ -1207,6 +1272,7 @@ def enqueue_json( content_based_deduplication: Optional[bool] = None, timeout: Optional[Union[str, int]] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]: """ Enqueues a message, after creating the queue if it does @@ -1272,6 +1338,10 @@ def enqueue_json( value permitted by the QStash plan. It is useful in scenarios, where a message should be delivered with a shorter timeout. :param label: Assign a label to the request to filter logs with it later. + :param redact: Configure which fields should be redacted in logs. + - `{"body": True}` to redact the request body + - `{"header": True}` to redact all headers + - `{"header": ["Authorization"]}` to redact specific headers """ return self.enqueue( queue=queue, @@ -1292,6 +1362,7 @@ def enqueue_json( content_based_deduplication=content_based_deduplication, timeout=timeout, label=label, + redact=redact, ) def batch( diff --git a/qstash/schedule.py b/qstash/schedule.py index 3c1de96..0aea97c 100644 --- a/qstash/schedule.py +++ b/qstash/schedule.py @@ -4,7 +4,7 @@ from typing import Any, Dict, List, Optional, Union from qstash.http import HttpClient, HttpMethod -from qstash.message import FlowControl, parse_flow_control, FlowControlProperties +from qstash.message import FlowControl, parse_flow_control, FlowControlProperties, Redact class ScheduleState(enum.Enum): @@ -118,6 +118,7 @@ def prepare_schedule_headers( queue: Optional[str], flow_control: Optional[FlowControl], label: Optional[str], + redact: Optional[Redact], ) -> Dict[str, str]: h = { "Upstash-Cron": cron, @@ -202,6 +203,23 @@ def prepare_schedule_headers( if label is not None: h["Upstash-Label"] = label + if redact is not None: + redact_parts = [] + + if redact.get("body"): + redact_parts.append("body") + + header_redact = redact.get("header") + if header_redact is not None: + if header_redact is True: + redact_parts.append("header") + elif isinstance(header_redact, list) and len(header_redact) > 0: + for header_name in header_redact: + redact_parts.append(f"header[{header_name}]") + + if redact_parts: + h["Upstash-Redact-Fields"] = ",".join(redact_parts) + return h @@ -269,6 +287,7 @@ def create( queue: Optional[str] = None, flow_control: Optional[FlowControl] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> str: """ Creates a schedule to send messages periodically. @@ -332,6 +351,10 @@ def create( :param flow_control: Settings for controlling the number of active requests, as well as the rate of requests with the same flow control key. :param label: Assign a label to the request to filter logs with it later. + :param redact: Configure which fields should be redacted in logs. + - `{"body": True}` to redact the request body + - `{"header": True}` to redact all headers + - `{"header": ["Authorization"]}` to redact specific headers """ req_headers = prepare_schedule_headers( cron=cron, @@ -350,6 +373,7 @@ def create( queue=queue, flow_control=flow_control, label=label, + redact=redact, ) response = self._http.request( @@ -381,6 +405,7 @@ def create_json( queue: Optional[str] = None, flow_control: Optional[FlowControl] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> str: """ Creates a schedule to send messages periodically, automatically serializing the @@ -445,6 +470,10 @@ def create_json( :param flow_control: Settings for controlling the number of active requests, as well as the rate of requests with the same flow control key. :param label: Assign a label to the request to filter logs with it later. + :param redact: Configure which fields should be redacted in logs. + - `{"body": True}` to redact the request body + - `{"header": True}` to redact all headers + - `{"header": ["Authorization"]}` to redact specific headers """ return self.create( destination=destination, @@ -465,6 +494,7 @@ def create_json( queue=queue, flow_control=flow_control, label=label, + redact=redact, ) def get(self, schedule_id: str) -> Schedule: From 763e3dcea65fbb8cbd0ef91582c2d1b0d37a95c4 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Wed, 18 Mar 2026 15:02:43 +0300 Subject: [PATCH 2/4] UPS-9: add missing redact parameter to async APIs and batch conversion Co-Authored-By: Claude Opus 4.6 (1M context) --- AGENTS.md | 40 ++++++++++++++++++++++++++++++++++++++ qstash/asyncio/message.py | 9 +++++++++ qstash/asyncio/schedule.py | 6 +++++- qstash/message.py | 3 +++ qstash/schedule.py | 7 ++++++- 5 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..969922f --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,40 @@ +# QStash Python SDK — Agent Instructions + +## SDK Design Instructions + +See `.github/instructions/qstash-sdk-design.instructions.md` for the canonical list of locations that must be updated when adding a new parameter. + +## Checklist: Adding a New Parameter + +When adding a new parameter to the SDK, follow this checklist **in full** before considering the work done. + +### 1. Update all required locations + +Refer to `.github/instructions/qstash-sdk-design.instructions.md` for the full list. The key rule: **every location listed there must be evaluated**. Not every parameter applies to every location (e.g. a request-only parameter won't appear in response dataclasses), but you must consciously check each one. + +Common locations that get missed: + +- **`qstash/asyncio/`** — The async variants (`asyncio/message.py`, `asyncio/schedule.py`) must mirror the sync API exactly. If you add a parameter to `MessageApi.publish`, you must also add it to `AsyncMessageApi.publish`, and so on for `publish_json`, `enqueue`, `enqueue_json`, `create`, `create_json`. +- **`convert_to_batch_messages`** in `qstash/message.py` — This function converts `BatchJsonRequest` to `BatchRequest`. If the parameter is in both TypedDicts, this function must copy it. +- **Response parsing** (`parse_schedule_response`, `parse_message_response`, `parse_logs_response`, `parse_dlq_message_response`) — If the API returns the parameter in responses, it must be added to the response dataclass and parse function. +- **Filter TypedDicts** (`LogFilter`, `DlqFilter`) and their param builders (`prepare_list_logs_request_params`, `prepare_list_dlq_messages_params`) — If the parameter is filterable. + +### 2. Run all checks before submitting + +Always run the full check suite **before** considering work complete: + +```sh +poetry run pytest +poetry run ruff format . +poetry run ruff check . +poetry run mypy --show-error-codes . +``` + +mypy will catch missing arguments (like `[call-arg]` errors for missing named arguments). Running it would have caught the asyncio omissions immediately. + +### 3. Verify asyncio parity + +After making changes to any file in `qstash/`, diff the sync and async versions to confirm parity: + +- `qstash/message.py` ↔ `qstash/asyncio/message.py` +- `qstash/schedule.py` ↔ `qstash/asyncio/schedule.py` diff --git a/qstash/asyncio/message.py b/qstash/asyncio/message.py index c836f0b..ed575db 100644 --- a/qstash/asyncio/message.py +++ b/qstash/asyncio/message.py @@ -6,6 +6,7 @@ from qstash.message import ( ApiT, FlowControl, + Redact, BatchJsonRequest, BatchRequest, BatchResponse, @@ -53,6 +54,7 @@ async def publish( timeout: Optional[Union[str, int]] = None, flow_control: Optional[FlowControl] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Union[PublishResponse, List[PublishUrlGroupResponse]]: """ Publishes a message to QStash. @@ -150,6 +152,7 @@ async def publish( timeout=timeout, flow_control=flow_control, label=label, + redact=redact, ) response = await self._http.request( @@ -183,6 +186,7 @@ async def publish_json( timeout: Optional[Union[str, int]] = None, flow_control: Optional[FlowControl] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Union[PublishResponse, List[PublishUrlGroupResponse]]: """ Publish a message to QStash, automatically serializing the @@ -277,6 +281,7 @@ async def publish_json( timeout=timeout, flow_control=flow_control, label=label, + redact=redact, ) async def enqueue( @@ -300,6 +305,7 @@ async def enqueue( content_based_deduplication: Optional[bool] = None, timeout: Optional[Union[str, int]] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]: """ Enqueues a message, after creating the queue if it does @@ -390,6 +396,7 @@ async def enqueue( timeout=timeout, flow_control=None, label=label, + redact=redact, ) response = await self._http.request( @@ -421,6 +428,7 @@ async def enqueue_json( content_based_deduplication: Optional[bool] = None, timeout: Optional[Union[str, int]] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> Union[EnqueueResponse, List[EnqueueUrlGroupResponse]]: """ Enqueues a message, after creating the queue if it does @@ -506,6 +514,7 @@ async def enqueue_json( content_based_deduplication=content_based_deduplication, timeout=timeout, label=label, + redact=redact, ) async def batch( diff --git a/qstash/asyncio/schedule.py b/qstash/asyncio/schedule.py index e034a7a..cf116c0 100644 --- a/qstash/asyncio/schedule.py +++ b/qstash/asyncio/schedule.py @@ -3,7 +3,7 @@ from qstash.asyncio.http import AsyncHttpClient from qstash.http import HttpMethod -from qstash.message import FlowControl +from qstash.message import FlowControl, Redact from qstash.schedule import ( Schedule, parse_schedule_response, @@ -36,6 +36,7 @@ async def create( queue: Optional[str] = None, flow_control: Optional[FlowControl] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> str: """ Creates a schedule to send messages periodically. @@ -117,6 +118,7 @@ async def create( queue=queue, flow_control=flow_control, label=label, + redact=redact, ) response = await self._http.request( @@ -148,6 +150,7 @@ async def create_json( queue: Optional[str] = None, flow_control: Optional[FlowControl] = None, label: Optional[str] = None, + redact: Optional[Redact] = None, ) -> str: """ Creates a schedule to send messages periodically, automatically serializing the @@ -232,6 +235,7 @@ async def create_json( queue=queue, flow_control=flow_control, label=label, + redact=redact, ) async def get(self, schedule_id: str) -> Schedule: diff --git a/qstash/message.py b/qstash/message.py index 18fe1ec..9a44496 100644 --- a/qstash/message.py +++ b/qstash/message.py @@ -815,6 +815,9 @@ def convert_to_batch_messages( if "label" in msg: batch_msg["label"] = msg["label"] + if "redact" in msg: + batch_msg["redact"] = msg["redact"] + batch_messages.append(batch_msg) return batch_messages diff --git a/qstash/schedule.py b/qstash/schedule.py index 0aea97c..26f557f 100644 --- a/qstash/schedule.py +++ b/qstash/schedule.py @@ -4,7 +4,12 @@ from typing import Any, Dict, List, Optional, Union from qstash.http import HttpClient, HttpMethod -from qstash.message import FlowControl, parse_flow_control, FlowControlProperties, Redact +from qstash.message import ( + FlowControl, + parse_flow_control, + FlowControlProperties, + Redact, +) class ScheduleState(enum.Enum): From 1641f959422d584e084551f4cd94026a26f8fbe4 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Wed, 18 Mar 2026 15:13:27 +0300 Subject: [PATCH 3/4] test: add sleep before verifying rate reset in flow control tests --- tests/asyncio/test_flow_control.py | 3 +++ tests/test_flow_control.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/tests/asyncio/test_flow_control.py b/tests/asyncio/test_flow_control.py index c58fb05..f9b0044 100644 --- a/tests/asyncio/test_flow_control.py +++ b/tests/asyncio/test_flow_control.py @@ -191,6 +191,9 @@ async def test_flow_control_reset_rate_async(async_client: AsyncQStash) -> None: # Reset the rate await async_client.flow_control.reset_rate(flow_control_key) + # sleep 1 second + await asyncio.sleep(1) + # Verify rate was reset by checking the flow control info info = await async_client.flow_control.get(flow_control_key) assert info.rate_count == 0 diff --git a/tests/test_flow_control.py b/tests/test_flow_control.py index 5a6a59f..c2d730c 100644 --- a/tests/test_flow_control.py +++ b/tests/test_flow_control.py @@ -180,6 +180,9 @@ def test_flow_control_reset_rate(client: QStash) -> None: # Reset the rate client.flow_control.reset_rate(flow_control_key) + # sleep 1 second + time.sleep(1) + # Verify rate was reset by checking the flow control info info = client.flow_control.get(flow_control_key) assert info.rate_count == 0 From e11ff459579b6767abfa4fed3134d22d6dfa0936 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Wed, 18 Mar 2026 16:12:55 +0300 Subject: [PATCH 4/4] test: replace sleep with assert_eventually for rate reset verification --- tests/asyncio/test_flow_control.py | 12 +++++++----- tests/test_flow_control.py | 12 +++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/tests/asyncio/test_flow_control.py b/tests/asyncio/test_flow_control.py index f9b0044..e5612f7 100644 --- a/tests/asyncio/test_flow_control.py +++ b/tests/asyncio/test_flow_control.py @@ -6,6 +6,7 @@ from qstash import AsyncQStash from qstash.flow_control_api import GlobalParallelismInfo from qstash.message import FlowControl, PublishResponse +from tests import assert_eventually_async @pytest.mark.asyncio @@ -191,12 +192,13 @@ async def test_flow_control_reset_rate_async(async_client: AsyncQStash) -> None: # Reset the rate await async_client.flow_control.reset_rate(flow_control_key) - # sleep 1 second - await asyncio.sleep(1) + # We use assert_eventually because reset_rate doesn't take effect immediately. + # Not ideal but it's what we have. + async def check_rate_reset() -> None: + info = await async_client.flow_control.get(flow_control_key) + assert info.rate_count == 0 - # Verify rate was reset by checking the flow control info - info = await async_client.flow_control.get(flow_control_key) - assert info.rate_count == 0 + await assert_eventually_async(check_rate_reset) # Clean up await async_client.message.cancel(result.message_id) diff --git a/tests/test_flow_control.py b/tests/test_flow_control.py index c2d730c..ebb0ddf 100644 --- a/tests/test_flow_control.py +++ b/tests/test_flow_control.py @@ -3,6 +3,7 @@ from qstash import QStash from qstash.flow_control_api import GlobalParallelismInfo from qstash.message import FlowControl, PublishResponse +from tests import assert_eventually def test_flow_control_get(client: QStash) -> None: @@ -180,12 +181,13 @@ def test_flow_control_reset_rate(client: QStash) -> None: # Reset the rate client.flow_control.reset_rate(flow_control_key) - # sleep 1 second - time.sleep(1) + # We use assert_eventually because reset_rate doesn't take effect immediately. + # Not ideal but it's what we have. + def check_rate_reset() -> None: + info = client.flow_control.get(flow_control_key) + assert info.rate_count == 0 - # Verify rate was reset by checking the flow control info - info = client.flow_control.get(flow_control_key) - assert info.rate_count == 0 + assert_eventually(check_rate_reset) # Clean up client.message.cancel(result.message_id)