From 4868892873ba11dce13c6e3d0855a69e330fe470 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Mon, 16 Feb 2026 13:54:03 +0300 Subject: [PATCH 1/7] feat: add flow control API Add client.flow_control namespace with list(), get(), and reset() methods (sync + async). Add FlowControlInfo dataclass. DX-2401 Co-Authored-By: Claude Opus 4.6 --- qstash/asyncio/client.py | 4 ++ qstash/asyncio/flow_control.py | 59 ++++++++++++++++++++ qstash/client.py | 4 ++ qstash/flow_control_api.py | 98 ++++++++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+) create mode 100644 qstash/asyncio/flow_control.py create mode 100644 qstash/flow_control_api.py diff --git a/qstash/asyncio/client.py b/qstash/asyncio/client.py index b830d15..19adffe 100644 --- a/qstash/asyncio/client.py +++ b/qstash/asyncio/client.py @@ -2,6 +2,7 @@ from typing import Literal, Optional, Union from qstash.asyncio.dlq import AsyncDlqApi +from qstash.asyncio.flow_control import AsyncFlowControlApi from qstash.asyncio.log import AsyncLogApi from qstash.asyncio.http import AsyncHttpClient from qstash.asyncio.message import AsyncMessageApi @@ -49,3 +50,6 @@ def __init__( self.dlq = AsyncDlqApi(self.http) """Dlq (Dead Letter Queue) api.""" + + self.flow_control = AsyncFlowControlApi(self.http) + """Flow control api.""" diff --git a/qstash/asyncio/flow_control.py b/qstash/asyncio/flow_control.py new file mode 100644 index 0000000..1c40fd5 --- /dev/null +++ b/qstash/asyncio/flow_control.py @@ -0,0 +1,59 @@ +from typing import Dict, List, Optional + +from qstash.asyncio.http import AsyncHttpClient +from qstash.flow_control_api import ( + FlowControlInfo, + parse_flow_control_info, +) + + +class AsyncFlowControlApi: + def __init__(self, http: AsyncHttpClient) -> None: + self._http = http + + async def list( + self, + *, + search: Optional[str] = None, + ) -> List[FlowControlInfo]: + """ + Lists all flow controls. + + :param search: Optional search string to filter flow control keys. + """ + params: Dict[str, str] = {} + if search is not None: + params["search"] = search + + response = await self._http.request( + path="/v2/flowControl", + method="GET", + params=params, + ) + + return [parse_flow_control_info(r) for r in response] + + async def get(self, flow_control_key: str) -> FlowControlInfo: + """ + Gets a single flow control by key. + + :param flow_control_key: The flow control key to get. + """ + response = await self._http.request( + path=f"/v2/flowControl/{flow_control_key}", + method="GET", + ) + + return parse_flow_control_info(response) + + async def reset(self, flow_control_key: str) -> None: + """ + Resets the counters of a flow control key. + + :param flow_control_key: The flow control key to reset. + """ + await self._http.request( + path=f"/v2/flowControl/{flow_control_key}/reset", + method="POST", + parse_response=False, + ) diff --git a/qstash/client.py b/qstash/client.py index 72fe4e5..a6758b0 100644 --- a/qstash/client.py +++ b/qstash/client.py @@ -2,6 +2,7 @@ from typing import Optional, Union, Literal from qstash.dlq import DlqApi +from qstash.flow_control_api import FlowControlApi from qstash.log import LogApi from qstash.http import RetryConfig, HttpClient from qstash.message import MessageApi @@ -50,3 +51,6 @@ def __init__( self.dlq = DlqApi(self.http) """Dlq (Dead Letter Queue) api.""" + + self.flow_control = FlowControlApi(self.http) + """Flow control api.""" diff --git a/qstash/flow_control_api.py b/qstash/flow_control_api.py new file mode 100644 index 0000000..ea5e5f3 --- /dev/null +++ b/qstash/flow_control_api.py @@ -0,0 +1,98 @@ +import dataclasses +from typing import Any, Dict, List, Optional + +from qstash.http import HttpClient + + +@dataclasses.dataclass +class FlowControlInfo: + """Information about a flow control key.""" + + key: str + """The flow control key.""" + + wait_list_size: int + """The number of messages waiting in the wait list.""" + + parallelism_max: int + """The maximum parallelism configured for this flow control key.""" + + parallelism_count: int + """The current number of active requests for this flow control key.""" + + rate_max: int + """The maximum rate configured for this flow control key.""" + + rate_count: int + """The current number of requests consumed in the current period.""" + + rate_period: int + """The rate period in seconds.""" + + rate_period_start: int + """The start time of the current rate period as a unix timestamp.""" + + +def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo: + return FlowControlInfo( + key=response["flowControlKey"], + wait_list_size=response.get("waitListSize", 0), + parallelism_max=response.get("parallelismMax", 0), + parallelism_count=response.get("parallelismCount", 0), + rate_max=response.get("rateMax", 0), + rate_count=response.get("rateCount", 0), + rate_period=response.get("ratePeriod", 0), + rate_period_start=response.get("ratePeriodStart", 0), + ) + + +class FlowControlApi: + def __init__(self, http: HttpClient) -> None: + self._http = http + + def list( + self, + *, + search: Optional[str] = None, + ) -> List[FlowControlInfo]: + """ + Lists all flow controls. + + :param search: Optional search string to filter flow control keys. + """ + params: Dict[str, str] = {} + if search is not None: + params["search"] = search + + response = self._http.request( + path="/v2/flowControl", + method="GET", + params=params, + ) + + return [parse_flow_control_info(r) for r in response] + + def get(self, flow_control_key: str) -> FlowControlInfo: + """ + Gets a single flow control by key. + + :param flow_control_key: The flow control key to get. + """ + response = self._http.request( + path=f"/v2/flowControl/{flow_control_key}", + method="GET", + ) + + return parse_flow_control_info(response) + + def reset(self, flow_control_key: str) -> None: + """ + Resets the counters of a flow control key. + + :param flow_control_key: The flow control key to reset. + """ + self._http.request( + path=f"/v2/flowControl/{flow_control_key}/reset", + method="POST", + parse_response=False, + ) From 14dabaf294e5ad5b8aa11814560a83608227ad53 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Mon, 16 Feb 2026 14:21:09 +0300 Subject: [PATCH 2/7] test: add flow control API tests Test list, get, reset, and search for sync and async clients. DX-2401 Co-Authored-By: Claude Opus 4.6 --- tests/asyncio/test_flow_control.py | 76 ++++++++++++++++++++++++++++++ tests/test_flow_control.py | 70 +++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 tests/asyncio/test_flow_control.py create mode 100644 tests/test_flow_control.py diff --git a/tests/asyncio/test_flow_control.py b/tests/asyncio/test_flow_control.py new file mode 100644 index 0000000..e0b1135 --- /dev/null +++ b/tests/asyncio/test_flow_control.py @@ -0,0 +1,76 @@ +import asyncio + +import pytest + +from qstash import AsyncQStash +from qstash.message import FlowControl + + +FLOW_CONTROL_KEY = "test-flow-control-key-async" + + +@pytest.mark.asyncio +async def test_flow_control_lifecycle_async(async_client: AsyncQStash) -> None: + # Publish a message with flow control to ensure the key exists + result = await async_client.message.publish_json( + body={"test": "value"}, + url="https://httpstat.us/200?sleep=30000", + flow_control=FlowControl( + key=FLOW_CONTROL_KEY, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert result["messageId"] + + # Small delay to let flow control state propagate + await asyncio.sleep(1) + + # List all flow controls + flow_controls = await async_client.flow_control.list() + assert isinstance(flow_controls, list) + + found = [fc for fc in flow_controls if fc.key == FLOW_CONTROL_KEY] + assert len(found) > 0 + + fc = found[0] + assert fc.key == FLOW_CONTROL_KEY + assert isinstance(fc.wait_list_size, int) + assert isinstance(fc.parallelism_max, int) + assert isinstance(fc.parallelism_count, int) + assert isinstance(fc.rate_max, int) + assert isinstance(fc.rate_count, int) + assert isinstance(fc.rate_period, int) + assert isinstance(fc.rate_period_start, int) + + # Get a single flow control by key + single = await async_client.flow_control.get(FLOW_CONTROL_KEY) + assert single.key == FLOW_CONTROL_KEY + assert isinstance(single.wait_list_size, int) + + # Clean up message + await async_client.message.cancel(result["messageId"]) + + +@pytest.mark.asyncio +async def test_flow_control_list_with_search_async( + async_client: AsyncQStash, +) -> None: + flow_controls = await async_client.flow_control.list(search=FLOW_CONTROL_KEY) + assert isinstance(flow_controls, list) + + for fc in flow_controls: + assert FLOW_CONTROL_KEY in fc.key + + +@pytest.mark.asyncio +async def test_flow_control_reset_async(async_client: AsyncQStash) -> None: + # Reset should not raise + await async_client.flow_control.reset(FLOW_CONTROL_KEY) + + # After reset, get should still work (counters are zeroed) + fc = await async_client.flow_control.get(FLOW_CONTROL_KEY) + assert fc.key == FLOW_CONTROL_KEY + assert fc.parallelism_count == 0 + assert fc.rate_count == 0 diff --git a/tests/test_flow_control.py b/tests/test_flow_control.py new file mode 100644 index 0000000..b951951 --- /dev/null +++ b/tests/test_flow_control.py @@ -0,0 +1,70 @@ +import time + +from qstash import QStash +from qstash.message import FlowControl + + +FLOW_CONTROL_KEY = "test-flow-control-key" + + +def test_flow_control_lifecycle(client: QStash) -> None: + # Publish a message with flow control to ensure the key exists + result = client.message.publish_json( + body={"test": "value"}, + url="https://httpstat.us/200?sleep=30000", + flow_control=FlowControl( + key=FLOW_CONTROL_KEY, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert result["messageId"] + + # Small delay to let flow control state propagate + time.sleep(1) + + # List all flow controls + flow_controls = client.flow_control.list() + assert isinstance(flow_controls, list) + + found = [fc for fc in flow_controls if fc.key == FLOW_CONTROL_KEY] + assert len(found) > 0 + + fc = found[0] + assert fc.key == FLOW_CONTROL_KEY + assert isinstance(fc.wait_list_size, int) + assert isinstance(fc.parallelism_max, int) + assert isinstance(fc.parallelism_count, int) + assert isinstance(fc.rate_max, int) + assert isinstance(fc.rate_count, int) + assert isinstance(fc.rate_period, int) + assert isinstance(fc.rate_period_start, int) + + # Get a single flow control by key + single = client.flow_control.get(FLOW_CONTROL_KEY) + assert single.key == FLOW_CONTROL_KEY + assert isinstance(single.wait_list_size, int) + assert isinstance(single.parallelism_max, int) + + # Clean up message + client.message.cancel(result["messageId"]) + + +def test_flow_control_list_with_search(client: QStash) -> None: + flow_controls = client.flow_control.list(search=FLOW_CONTROL_KEY) + assert isinstance(flow_controls, list) + + for fc in flow_controls: + assert FLOW_CONTROL_KEY in fc.key + + +def test_flow_control_reset(client: QStash) -> None: + # Reset should not raise + client.flow_control.reset(FLOW_CONTROL_KEY) + + # After reset, get should still work (counters are zeroed) + fc = client.flow_control.get(FLOW_CONTROL_KEY) + assert fc.key == FLOW_CONTROL_KEY + assert fc.parallelism_count == 0 + assert fc.rate_count == 0 From e80a124d3eed8983b2bbbeb4c140719493df4579 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Mon, 16 Feb 2026 14:32:30 +0300 Subject: [PATCH 3/7] fix: fmt --- tests/asyncio/test_flow_control.py | 7 ++++--- tests/test_flow_control.py | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/asyncio/test_flow_control.py b/tests/asyncio/test_flow_control.py index e0b1135..53e56e1 100644 --- a/tests/asyncio/test_flow_control.py +++ b/tests/asyncio/test_flow_control.py @@ -3,7 +3,7 @@ import pytest from qstash import AsyncQStash -from qstash.message import FlowControl +from qstash.message import FlowControl, PublishResponse FLOW_CONTROL_KEY = "test-flow-control-key-async" @@ -22,7 +22,8 @@ async def test_flow_control_lifecycle_async(async_client: AsyncQStash) -> None: period="1m", ), ) - assert result["messageId"] + assert isinstance(result, PublishResponse) + assert result.message_id # Small delay to let flow control state propagate await asyncio.sleep(1) @@ -50,7 +51,7 @@ async def test_flow_control_lifecycle_async(async_client: AsyncQStash) -> None: assert isinstance(single.wait_list_size, int) # Clean up message - await async_client.message.cancel(result["messageId"]) + await async_client.message.cancel(result.message_id) @pytest.mark.asyncio diff --git a/tests/test_flow_control.py b/tests/test_flow_control.py index b951951..75c304c 100644 --- a/tests/test_flow_control.py +++ b/tests/test_flow_control.py @@ -1,7 +1,7 @@ import time from qstash import QStash -from qstash.message import FlowControl +from qstash.message import FlowControl, PublishResponse FLOW_CONTROL_KEY = "test-flow-control-key" @@ -19,7 +19,8 @@ def test_flow_control_lifecycle(client: QStash) -> None: period="1m", ), ) - assert result["messageId"] + assert isinstance(result, PublishResponse) + assert result.message_id # Small delay to let flow control state propagate time.sleep(1) @@ -48,7 +49,7 @@ def test_flow_control_lifecycle(client: QStash) -> None: assert isinstance(single.parallelism_max, int) # Clean up message - client.message.cancel(result["messageId"]) + client.message.cancel(result.message_id) def test_flow_control_list_with_search(client: QStash) -> None: From 35f077892da68d92245a51b1c2e713190798a3c4 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Wed, 18 Feb 2026 12:53:43 +0300 Subject: [PATCH 4/7] DX-2401: Add get_global_parallelism, remove list/reset - Add GlobalParallelismInfo dataclass and get_global_parallelism() method (sync + async) - Remove flow_control.list() and flow_control.reset() methods - Update tests accordingly Co-Authored-By: Claude Opus 4.6 --- qstash/asyncio/flow_control.py | 46 ++++++++---------------- qstash/flow_control_api.py | 56 +++++++++++++----------------- tests/asyncio/test_flow_control.py | 45 +++++------------------- tests/test_flow_control.py | 43 +++++------------------ 4 files changed, 55 insertions(+), 135 deletions(-) diff --git a/qstash/asyncio/flow_control.py b/qstash/asyncio/flow_control.py index 1c40fd5..1da0244 100644 --- a/qstash/asyncio/flow_control.py +++ b/qstash/asyncio/flow_control.py @@ -1,8 +1,9 @@ -from typing import Dict, List, Optional +from typing import Dict from qstash.asyncio.http import AsyncHttpClient from qstash.flow_control_api import ( FlowControlInfo, + GlobalParallelismInfo, parse_flow_control_info, ) @@ -11,29 +12,7 @@ class AsyncFlowControlApi: def __init__(self, http: AsyncHttpClient) -> None: self._http = http - async def list( - self, - *, - search: Optional[str] = None, - ) -> List[FlowControlInfo]: - """ - Lists all flow controls. - - :param search: Optional search string to filter flow control keys. - """ - params: Dict[str, str] = {} - if search is not None: - params["search"] = search - - response = await self._http.request( - path="/v2/flowControl", - method="GET", - params=params, - ) - - return [parse_flow_control_info(r) for r in response] - - async def get(self, flow_control_key: str) -> FlowControlInfo: +async def get(self, flow_control_key: str) -> FlowControlInfo: """ Gets a single flow control by key. @@ -46,14 +25,17 @@ async def get(self, flow_control_key: str) -> FlowControlInfo: return parse_flow_control_info(response) - async def reset(self, flow_control_key: str) -> None: + async def get_global_parallelism(self) -> GlobalParallelismInfo: """ - Resets the counters of a flow control key. - - :param flow_control_key: The flow control key to reset. + Gets the global parallelism info. """ - await self._http.request( - path=f"/v2/flowControl/{flow_control_key}/reset", - method="POST", - parse_response=False, + response = await self._http.request( + path="/v2/globalParallelism", + method="GET", ) + + return GlobalParallelismInfo( + parallelism_max=response.get("parallelismMax", 0), + parallelism_count=response.get("parallelismCount", 0), + ) + diff --git a/qstash/flow_control_api.py b/qstash/flow_control_api.py index ea5e5f3..6bcbd73 100644 --- a/qstash/flow_control_api.py +++ b/qstash/flow_control_api.py @@ -1,5 +1,5 @@ import dataclasses -from typing import Any, Dict, List, Optional +from typing import Any, Dict from qstash.http import HttpClient @@ -33,6 +33,17 @@ class FlowControlInfo: """The start time of the current rate period as a unix timestamp.""" +@dataclasses.dataclass +class GlobalParallelismInfo: + """Information about global parallelism.""" + + parallelism_max: int + """The maximum global parallelism.""" + + parallelism_count: int + """The current number of active requests globally.""" + + def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo: return FlowControlInfo( key=response["flowControlKey"], @@ -50,29 +61,7 @@ class FlowControlApi: def __init__(self, http: HttpClient) -> None: self._http = http - def list( - self, - *, - search: Optional[str] = None, - ) -> List[FlowControlInfo]: - """ - Lists all flow controls. - - :param search: Optional search string to filter flow control keys. - """ - params: Dict[str, str] = {} - if search is not None: - params["search"] = search - - response = self._http.request( - path="/v2/flowControl", - method="GET", - params=params, - ) - - return [parse_flow_control_info(r) for r in response] - - def get(self, flow_control_key: str) -> FlowControlInfo: +def get(self, flow_control_key: str) -> FlowControlInfo: """ Gets a single flow control by key. @@ -85,14 +74,17 @@ def get(self, flow_control_key: str) -> FlowControlInfo: return parse_flow_control_info(response) - def reset(self, flow_control_key: str) -> None: + def get_global_parallelism(self) -> GlobalParallelismInfo: """ - Resets the counters of a flow control key. - - :param flow_control_key: The flow control key to reset. + Gets the global parallelism info. """ - self._http.request( - path=f"/v2/flowControl/{flow_control_key}/reset", - method="POST", - parse_response=False, + response = self._http.request( + path="/v2/globalParallelism", + method="GET", ) + + return GlobalParallelismInfo( + parallelism_max=response.get("parallelismMax", 0), + parallelism_count=response.get("parallelismCount", 0), + ) + diff --git a/tests/asyncio/test_flow_control.py b/tests/asyncio/test_flow_control.py index 53e56e1..ec0fb6f 100644 --- a/tests/asyncio/test_flow_control.py +++ b/tests/asyncio/test_flow_control.py @@ -3,6 +3,7 @@ import pytest from qstash import AsyncQStash +from qstash.flow_control_api import GlobalParallelismInfo from qstash.message import FlowControl, PublishResponse @@ -10,7 +11,7 @@ @pytest.mark.asyncio -async def test_flow_control_lifecycle_async(async_client: AsyncQStash) -> None: +async def test_flow_control_get_async(async_client: AsyncQStash) -> None: # Publish a message with flow control to ensure the key exists result = await async_client.message.publish_json( body={"test": "value"}, @@ -28,50 +29,22 @@ async def test_flow_control_lifecycle_async(async_client: AsyncQStash) -> None: # Small delay to let flow control state propagate await asyncio.sleep(1) - # List all flow controls - flow_controls = await async_client.flow_control.list() - assert isinstance(flow_controls, list) - - found = [fc for fc in flow_controls if fc.key == FLOW_CONTROL_KEY] - assert len(found) > 0 - - fc = found[0] - assert fc.key == FLOW_CONTROL_KEY - assert isinstance(fc.wait_list_size, int) - assert isinstance(fc.parallelism_max, int) - assert isinstance(fc.parallelism_count, int) - assert isinstance(fc.rate_max, int) - assert isinstance(fc.rate_count, int) - assert isinstance(fc.rate_period, int) - assert isinstance(fc.rate_period_start, int) - # Get a single flow control by key single = await async_client.flow_control.get(FLOW_CONTROL_KEY) assert single.key == FLOW_CONTROL_KEY assert isinstance(single.wait_list_size, int) + assert isinstance(single.parallelism_max, int) + assert isinstance(single.parallelism_count, int) # Clean up message await async_client.message.cancel(result.message_id) @pytest.mark.asyncio -async def test_flow_control_list_with_search_async( +async def test_flow_control_get_global_parallelism_async( async_client: AsyncQStash, ) -> None: - flow_controls = await async_client.flow_control.list(search=FLOW_CONTROL_KEY) - assert isinstance(flow_controls, list) - - for fc in flow_controls: - assert FLOW_CONTROL_KEY in fc.key - - -@pytest.mark.asyncio -async def test_flow_control_reset_async(async_client: AsyncQStash) -> None: - # Reset should not raise - await async_client.flow_control.reset(FLOW_CONTROL_KEY) - - # After reset, get should still work (counters are zeroed) - fc = await async_client.flow_control.get(FLOW_CONTROL_KEY) - assert fc.key == FLOW_CONTROL_KEY - assert fc.parallelism_count == 0 - assert fc.rate_count == 0 + info = await async_client.flow_control.get_global_parallelism() + assert isinstance(info, GlobalParallelismInfo) + assert isinstance(info.parallelism_max, int) + assert isinstance(info.parallelism_count, int) diff --git a/tests/test_flow_control.py b/tests/test_flow_control.py index 75c304c..2f79e30 100644 --- a/tests/test_flow_control.py +++ b/tests/test_flow_control.py @@ -1,13 +1,14 @@ import time from qstash import QStash +from qstash.flow_control_api import GlobalParallelismInfo from qstash.message import FlowControl, PublishResponse FLOW_CONTROL_KEY = "test-flow-control-key" -def test_flow_control_lifecycle(client: QStash) -> None: +def test_flow_control_get(client: QStash) -> None: # Publish a message with flow control to ensure the key exists result = client.message.publish_json( body={"test": "value"}, @@ -25,47 +26,19 @@ def test_flow_control_lifecycle(client: QStash) -> None: # Small delay to let flow control state propagate time.sleep(1) - # List all flow controls - flow_controls = client.flow_control.list() - assert isinstance(flow_controls, list) - - found = [fc for fc in flow_controls if fc.key == FLOW_CONTROL_KEY] - assert len(found) > 0 - - fc = found[0] - assert fc.key == FLOW_CONTROL_KEY - assert isinstance(fc.wait_list_size, int) - assert isinstance(fc.parallelism_max, int) - assert isinstance(fc.parallelism_count, int) - assert isinstance(fc.rate_max, int) - assert isinstance(fc.rate_count, int) - assert isinstance(fc.rate_period, int) - assert isinstance(fc.rate_period_start, int) - # Get a single flow control by key single = client.flow_control.get(FLOW_CONTROL_KEY) assert single.key == FLOW_CONTROL_KEY assert isinstance(single.wait_list_size, int) assert isinstance(single.parallelism_max, int) + assert isinstance(single.parallelism_count, int) # Clean up message client.message.cancel(result.message_id) -def test_flow_control_list_with_search(client: QStash) -> None: - flow_controls = client.flow_control.list(search=FLOW_CONTROL_KEY) - assert isinstance(flow_controls, list) - - for fc in flow_controls: - assert FLOW_CONTROL_KEY in fc.key - - -def test_flow_control_reset(client: QStash) -> None: - # Reset should not raise - client.flow_control.reset(FLOW_CONTROL_KEY) - - # After reset, get should still work (counters are zeroed) - fc = client.flow_control.get(FLOW_CONTROL_KEY) - assert fc.key == FLOW_CONTROL_KEY - assert fc.parallelism_count == 0 - assert fc.rate_count == 0 +def test_flow_control_get_global_parallelism(client: QStash) -> None: + info = client.flow_control.get_global_parallelism() + assert isinstance(info, GlobalParallelismInfo) + assert isinstance(info.parallelism_max, int) + assert isinstance(info.parallelism_count, int) From 73a759638794ff5f6053f3a5e0cba1f9b2cdfbf0 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Wed, 18 Feb 2026 13:27:58 +0300 Subject: [PATCH 5/7] fix: fmt --- qstash/asyncio/flow_control.py | 5 +---- qstash/flow_control_api.py | 3 +-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/qstash/asyncio/flow_control.py b/qstash/asyncio/flow_control.py index 1da0244..1cf9468 100644 --- a/qstash/asyncio/flow_control.py +++ b/qstash/asyncio/flow_control.py @@ -1,5 +1,3 @@ -from typing import Dict - from qstash.asyncio.http import AsyncHttpClient from qstash.flow_control_api import ( FlowControlInfo, @@ -12,7 +10,7 @@ class AsyncFlowControlApi: def __init__(self, http: AsyncHttpClient) -> None: self._http = http -async def get(self, flow_control_key: str) -> FlowControlInfo: + async def get(self, flow_control_key: str) -> FlowControlInfo: """ Gets a single flow control by key. @@ -38,4 +36,3 @@ async def get_global_parallelism(self) -> GlobalParallelismInfo: parallelism_max=response.get("parallelismMax", 0), parallelism_count=response.get("parallelismCount", 0), ) - diff --git a/qstash/flow_control_api.py b/qstash/flow_control_api.py index 6bcbd73..83e0172 100644 --- a/qstash/flow_control_api.py +++ b/qstash/flow_control_api.py @@ -61,7 +61,7 @@ class FlowControlApi: def __init__(self, http: HttpClient) -> None: self._http = http -def get(self, flow_control_key: str) -> FlowControlInfo: + def get(self, flow_control_key: str) -> FlowControlInfo: """ Gets a single flow control by key. @@ -87,4 +87,3 @@ def get_global_parallelism(self) -> GlobalParallelismInfo: parallelism_max=response.get("parallelismMax", 0), parallelism_count=response.get("parallelismCount", 0), ) - From 10cc7ec4cffdc467031f04dbe00b6f007bb0e8fd Mon Sep 17 00:00:00 2001 From: CahidArda Date: Fri, 20 Feb 2026 12:58:02 +0300 Subject: [PATCH 6/7] fix: retry delay expression parsing --- qstash/log.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/qstash/log.py b/qstash/log.py index 5972831..dfd0b30 100644 --- a/qstash/log.py +++ b/qstash/log.py @@ -246,7 +246,7 @@ def parse_logs_response(response: List[Dict[str, Any]]) -> List[Log]: flow_control=flow_control, method=event.get("method"), max_retries=event.get("maxRetries"), - retry_delay_expression=event.get("retryDelayExpression"), + retry_delay_expression=event.get("retryDelayExpr"), label=event.get("label"), ) ) @@ -285,6 +285,8 @@ def list( params=params, ) + print(response) + logs = parse_logs_response(response["events"]) return ListLogsResponse( From b1e0016b019d04c2e3d5d5bf324f691de4d6e0c1 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Fri, 20 Feb 2026 13:35:31 +0300 Subject: [PATCH 7/7] fix: use correct async fixture and method in async message tests --- tests/asyncio/test_message.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/asyncio/test_message.py b/tests/asyncio/test_message.py index 8930178..7de8787 100644 --- a/tests/asyncio/test_message.py +++ b/tests/asyncio/test_message.py @@ -288,6 +288,7 @@ async def test_publish_to_url_group_async(async_client: AsyncQStash) -> None: ) res = await async_client.message.publish( + method="GET", body="test-body", url_group=name, ) @@ -385,10 +386,10 @@ async def test_publish_to_api_llm_custom_provider_async( @pytest.mark.asyncio async def test_enqueue_api_llm_custom_provider_async( async_client: AsyncQStash, - cleanup_queue: Callable[[AsyncQStash, str], None], + cleanup_queue_async: Callable[[AsyncQStash, str], None], ) -> None: name = "test_queue" - cleanup_queue(async_client, name) + cleanup_queue_async(async_client, name) res = await async_client.message.enqueue_json( queue=name,