diff --git a/pyproject.toml b/pyproject.toml index a8cad6c..f44d4c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "qstash" -version = "3.2.0" +version = "3.3.0" description = "Python SDK for Upstash QStash" license = "MIT" authors = ["Upstash "] diff --git a/qstash/__init__.py b/qstash/__init__.py index 56e80a6..0b0e8dd 100644 --- a/qstash/__init__.py +++ b/qstash/__init__.py @@ -2,5 +2,5 @@ from qstash.client import QStash from qstash.receiver import Receiver -__version__ = "3.2.0" +__version__ = "3.3.0" __all__ = ["QStash", "AsyncQStash", "Receiver"] diff --git a/qstash/asyncio/flow_control.py b/qstash/asyncio/flow_control.py index 1cf9468..5c39595 100644 --- a/qstash/asyncio/flow_control.py +++ b/qstash/asyncio/flow_control.py @@ -1,7 +1,11 @@ +from typing import Dict + from qstash.asyncio.http import AsyncHttpClient from qstash.flow_control_api import ( FlowControlInfo, GlobalParallelismInfo, + PinFlowControlOptions, + UnpinFlowControlOptions, parse_flow_control_info, ) @@ -36,3 +40,95 @@ async def get_global_parallelism(self) -> GlobalParallelismInfo: parallelism_max=response.get("parallelismMax", 0), parallelism_count=response.get("parallelismCount", 0), ) + + async def pause(self, flow_control_key: str) -> None: + """ + Pause message delivery for a flow-control key. + + Messages already in the waitlist will remain there. + New incoming messages will be added directly to the waitlist. + + :param flow_control_key: The flow control key to pause. + """ + await self._http.request( + path=f"/v2/flowControl/{flow_control_key}/pause", + method="POST", + parse_response=False, + ) + + async def resume(self, flow_control_key: str) -> None: + """ + Resume message delivery for a flow-control key. + + :param flow_control_key: The flow control key to resume. + """ + await self._http.request( + path=f"/v2/flowControl/{flow_control_key}/resume", + method="POST", + parse_response=False, + ) + + async def pin(self, flow_control_key: str, options: PinFlowControlOptions) -> None: + """ + Pin a processing configuration for a flow-control key. + + While pinned, the system ignores configurations provided by incoming + messages and uses the pinned configuration instead. + + :param flow_control_key: The flow control key to pin. + :param options: The configuration to pin. + """ + params: Dict[str, str] = {} + if "parallelism" in options: + params["parallelism"] = str(options["parallelism"]) + if "rate" in options: + params["rate"] = str(options["rate"]) + if "period" in options: + params["period"] = str(options["period"]) + + await self._http.request( + path=f"/v2/flowControl/{flow_control_key}/pin", + method="POST", + params=params or None, + parse_response=False, + ) + + async def unpin( + self, flow_control_key: str, options: UnpinFlowControlOptions + ) -> None: + """ + Remove the pinned configuration for a flow-control key. + + After unpinning, the system resumes updating the configuration + based on incoming messages. + + :param flow_control_key: The flow control key to unpin. + :param options: Which configurations to unpin. + """ + params: Dict[str, str] = {} + if "parallelism" in options: + params["parallelism"] = str(options["parallelism"]).lower() + if "rate" in options: + params["rate"] = str(options["rate"]).lower() + + await self._http.request( + path=f"/v2/flowControl/{flow_control_key}/unpin", + method="POST", + params=params or None, + parse_response=False, + ) + + async def reset_rate(self, flow_control_key: str) -> None: + """ + Reset the rate configuration state for a flow-control key. + + Clears the current rate count and immediately ends the current period. + The current timestamp becomes the start of the new rate period. + + :param flow_control_key: The flow control key to reset rate for. + """ + await self._http.request( + path=f"/v2/flowControl/{flow_control_key}/resetRate", + method="POST", + parse_response=False, + ) diff --git a/qstash/flow_control_api.py b/qstash/flow_control_api.py index 83e0172..53967d4 100644 --- a/qstash/flow_control_api.py +++ b/qstash/flow_control_api.py @@ -1,5 +1,5 @@ import dataclasses -from typing import Any, Dict +from typing import Any, Dict, TypedDict from qstash.http import HttpClient @@ -32,6 +32,15 @@ class FlowControlInfo: rate_period_start: int """The start time of the current rate period as a unix timestamp.""" + is_paused: bool + """Whether message delivery is paused for this flow control key.""" + + is_pinned_parallelism: bool + """Whether the parallelism configuration is pinned.""" + + is_pinned_rate: bool + """Whether the rate configuration is pinned.""" + @dataclasses.dataclass class GlobalParallelismInfo: @@ -44,6 +53,29 @@ class GlobalParallelismInfo: """The current number of active requests globally.""" +class PinFlowControlOptions(TypedDict, total=False): + """Options for pinning a flow control key configuration.""" + + parallelism: int + """The parallelism value to apply to the flow-control key.""" + + rate: int + """The rate value to apply to the flow-control key.""" + + period: int + """The period value to apply to the flow-control key, in seconds.""" + + +class UnpinFlowControlOptions(TypedDict, total=False): + """Options for unpinning a flow control key configuration.""" + + parallelism: bool + """Whether to unpin the parallelism configuration.""" + + rate: bool + """Whether to unpin the rate configuration.""" + + def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo: return FlowControlInfo( key=response["flowControlKey"], @@ -54,6 +86,9 @@ def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo: rate_count=response.get("rateCount", 0), rate_period=response.get("ratePeriod", 0), rate_period_start=response.get("ratePeriodStart", 0), + is_paused=response.get("isPaused", False), + is_pinned_parallelism=response.get("isPinnedParallelism", False), + is_pinned_rate=response.get("isPinnedRate", False), ) @@ -87,3 +122,93 @@ def get_global_parallelism(self) -> GlobalParallelismInfo: parallelism_max=response.get("parallelismMax", 0), parallelism_count=response.get("parallelismCount", 0), ) + + def pause(self, flow_control_key: str) -> None: + """ + Pause message delivery for a flow-control key. + + Messages already in the waitlist will remain there. + New incoming messages will be added directly to the waitlist. + + :param flow_control_key: The flow control key to pause. + """ + self._http.request( + path=f"/v2/flowControl/{flow_control_key}/pause", + method="POST", + parse_response=False, + ) + + def resume(self, flow_control_key: str) -> None: + """ + Resume message delivery for a flow-control key. + + :param flow_control_key: The flow control key to resume. + """ + self._http.request( + path=f"/v2/flowControl/{flow_control_key}/resume", + method="POST", + parse_response=False, + ) + + def pin(self, flow_control_key: str, options: PinFlowControlOptions) -> None: + """ + Pin a processing configuration for a flow-control key. + + While pinned, the system ignores configurations provided by incoming + messages and uses the pinned configuration instead. + + :param flow_control_key: The flow control key to pin. + :param options: The configuration to pin. + """ + params: Dict[str, str] = {} + if "parallelism" in options: + params["parallelism"] = str(options["parallelism"]) + if "rate" in options: + params["rate"] = str(options["rate"]) + if "period" in options: + params["period"] = str(options["period"]) + + self._http.request( + path=f"/v2/flowControl/{flow_control_key}/pin", + method="POST", + params=params or None, + parse_response=False, + ) + + def unpin(self, flow_control_key: str, options: UnpinFlowControlOptions) -> None: + """ + Remove the pinned configuration for a flow-control key. + + After unpinning, the system resumes updating the configuration + based on incoming messages. + + :param flow_control_key: The flow control key to unpin. + :param options: Which configurations to unpin. + """ + params: Dict[str, str] = {} + if "parallelism" in options: + params["parallelism"] = str(options["parallelism"]).lower() + if "rate" in options: + params["rate"] = str(options["rate"]).lower() + + self._http.request( + path=f"/v2/flowControl/{flow_control_key}/unpin", + method="POST", + params=params or None, + parse_response=False, + ) + + def reset_rate(self, flow_control_key: str) -> None: + """ + Reset the rate configuration state for a flow-control key. + + Clears the current rate count and immediately ends the current period. + The current timestamp becomes the start of the new rate period. + + :param flow_control_key: The flow control key to reset rate for. + """ + self._http.request( + path=f"/v2/flowControl/{flow_control_key}/resetRate", + method="POST", + parse_response=False, + ) diff --git a/tests/asyncio/test_flow_control.py b/tests/asyncio/test_flow_control.py index ec0fb6f..c58fb05 100644 --- a/tests/asyncio/test_flow_control.py +++ b/tests/asyncio/test_flow_control.py @@ -1,4 +1,5 @@ import asyncio +import time import pytest @@ -7,17 +8,16 @@ from qstash.message import FlowControl, PublishResponse -FLOW_CONTROL_KEY = "test-flow-control-key-async" - - @pytest.mark.asyncio async def test_flow_control_get_async(async_client: AsyncQStash) -> None: + flow_control_key = f"fc-info-{int(time.time() * 1000)}" + # Publish a message with flow control to ensure the key exists result = await async_client.message.publish_json( body={"test": "value"}, - url="https://httpstat.us/200?sleep=30000", + url="https://mock.httpstatus.io/200?sleep=30000", flow_control=FlowControl( - key=FLOW_CONTROL_KEY, + key=flow_control_key, parallelism=5, rate=10, period="1m", @@ -30,8 +30,8 @@ async def test_flow_control_get_async(async_client: AsyncQStash) -> None: await asyncio.sleep(1) # Get a single flow control by key - single = await async_client.flow_control.get(FLOW_CONTROL_KEY) - assert single.key == FLOW_CONTROL_KEY + single = await async_client.flow_control.get(flow_control_key) + assert single.key == flow_control_key assert isinstance(single.wait_list_size, int) assert isinstance(single.parallelism_max, int) assert isinstance(single.parallelism_count, int) @@ -48,3 +48,152 @@ async def test_flow_control_get_global_parallelism_async( assert isinstance(info, GlobalParallelismInfo) assert isinstance(info.parallelism_max, int) assert isinstance(info.parallelism_count, int) + + +@pytest.mark.asyncio +async def test_flow_control_pause_resume_async(async_client: AsyncQStash) -> None: + flow_control_key = f"fc-pause-{int(time.time() * 1000)}" + + # Publish a message with flow control to ensure the key exists + result = await async_client.message.publish_json( + body={"test": "value"}, + url="https://mock.httpstatus.io/200?sleep=30000", + flow_control=FlowControl( + key=flow_control_key, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + assert result.message_id + + # Pause the flow control key + await async_client.flow_control.pause(flow_control_key) + + # Verify it's paused + paused = await async_client.flow_control.get(flow_control_key) + assert paused.is_paused is True + + # Resume the flow control key + await async_client.flow_control.resume(flow_control_key) + + # Verify it's resumed + resumed = await async_client.flow_control.get(flow_control_key) + assert resumed.is_paused is False + + # Clean up + await async_client.message.cancel(result.message_id) + + +@pytest.mark.asyncio +async def test_flow_control_pin_unpin_async(async_client: AsyncQStash) -> None: + flow_control_key = f"fc-pin-{int(time.time() * 1000)}" + + # Publish a message with flow control to ensure the key exists + result = await async_client.message.publish_json( + body={"test": "value"}, + url="https://mock.httpstatus.io/200?sleep=30000", + flow_control=FlowControl( + key=flow_control_key, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + assert result.message_id + + # Pin the configuration + await async_client.flow_control.pin( + flow_control_key, + {"parallelism": 3, "rate": 20, "period": 120}, + ) + + # Verify it's pinned + pinned = await async_client.flow_control.get(flow_control_key) + assert pinned.is_pinned_parallelism is True + assert pinned.is_pinned_rate is True + assert pinned.parallelism_max == 3 + assert pinned.rate_max == 20 + assert pinned.rate_period == 120 + + # Unpin the configuration + await async_client.flow_control.unpin( + flow_control_key, + {"parallelism": True, "rate": True}, + ) + + # Verify it's unpinned + unpinned = await async_client.flow_control.get(flow_control_key) + assert unpinned.is_pinned_parallelism is False + assert unpinned.is_pinned_rate is False + + # Clean up + await async_client.message.cancel(result.message_id) + + +@pytest.mark.asyncio +async def test_flow_control_unpin_single_async(async_client: AsyncQStash) -> None: + flow_control_key = f"fc-unpin-single-{int(time.time() * 1000)}" + + result = await async_client.message.publish_json( + body={"test": "value"}, + url="https://mock.httpstatus.io/200?sleep=30000", + flow_control=FlowControl( + key=flow_control_key, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + + # Pin both parallelism and rate + await async_client.flow_control.pin( + flow_control_key, + {"parallelism": 3, "rate": 20, "period": 120}, + ) + + pinned = await async_client.flow_control.get(flow_control_key) + assert pinned.is_pinned_parallelism is True + assert pinned.is_pinned_rate is True + + # Unpin only parallelism + await async_client.flow_control.unpin(flow_control_key, {"parallelism": True}) + + partial = await async_client.flow_control.get(flow_control_key) + assert partial.is_pinned_parallelism is False + assert partial.is_pinned_rate is True + + # Clean up + await async_client.message.cancel(result.message_id) + + +@pytest.mark.asyncio +async def test_flow_control_reset_rate_async(async_client: AsyncQStash) -> None: + flow_control_key = f"fc-reset-{int(time.time() * 1000)}" + + # Publish a message with flow control to ensure the key exists + result = await async_client.message.publish_json( + body={"test": "value"}, + url="https://mock.httpstatus.io/200?sleep=30000", + flow_control=FlowControl( + key=flow_control_key, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + assert result.message_id + + # Reset the rate + await async_client.flow_control.reset_rate(flow_control_key) + + # Verify rate was reset by checking the flow control info + info = await async_client.flow_control.get(flow_control_key) + assert info.rate_count == 0 + + # Clean up + await async_client.message.cancel(result.message_id) diff --git a/tests/asyncio/test_message.py b/tests/asyncio/test_message.py index 7de8787..2536726 100644 --- a/tests/asyncio/test_message.py +++ b/tests/asyncio/test_message.py @@ -318,14 +318,14 @@ async def test_timeout_async(async_client: AsyncQStash) -> None: @pytest.mark.asyncio async def test_cancel_many_async(async_client: AsyncQStash) -> None: res0 = await async_client.message.publish( - url="http://httpstat.us/404", + url="https://mock.httpstatus.io/404", retries=3, ) assert isinstance(res0, PublishResponse) res1 = await async_client.message.publish( - url="http://httpstat.us/404", + url="https://mock.httpstatus.io/404", retries=3, ) @@ -341,14 +341,14 @@ async def test_cancel_many_async(async_client: AsyncQStash) -> None: @pytest.mark.asyncio async def test_cancel_all_async(async_client: AsyncQStash) -> None: res0 = await async_client.message.publish( - url="http://httpstat.us/404", + url="https://mock.httpstatus.io/404", retries=3, ) assert isinstance(res0, PublishResponse) res1 = await async_client.message.publish( - url="http://httpstat.us/404", + url="https://mock.httpstatus.io/404", retries=3, ) diff --git a/tests/test_flow_control.py b/tests/test_flow_control.py index 2f79e30..5a6a59f 100644 --- a/tests/test_flow_control.py +++ b/tests/test_flow_control.py @@ -5,16 +5,15 @@ from qstash.message import FlowControl, PublishResponse -FLOW_CONTROL_KEY = "test-flow-control-key" - - def test_flow_control_get(client: QStash) -> None: + flow_control_key = f"fc-info-{int(time.time() * 1000)}" + # Publish a message with flow control to ensure the key exists result = client.message.publish_json( body={"test": "value"}, - url="https://httpstat.us/200?sleep=30000", + url="https://mock.httpstatus.io/200?sleep=30000", flow_control=FlowControl( - key=FLOW_CONTROL_KEY, + key=flow_control_key, parallelism=5, rate=10, period="1m", @@ -27,8 +26,8 @@ def test_flow_control_get(client: QStash) -> None: time.sleep(1) # Get a single flow control by key - single = client.flow_control.get(FLOW_CONTROL_KEY) - assert single.key == FLOW_CONTROL_KEY + single = client.flow_control.get(flow_control_key) + assert single.key == flow_control_key assert isinstance(single.wait_list_size, int) assert isinstance(single.parallelism_max, int) assert isinstance(single.parallelism_count, int) @@ -42,3 +41,148 @@ def test_flow_control_get_global_parallelism(client: QStash) -> None: assert isinstance(info, GlobalParallelismInfo) assert isinstance(info.parallelism_max, int) assert isinstance(info.parallelism_count, int) + + +def test_flow_control_pause_resume(client: QStash) -> None: + flow_control_key = f"fc-pause-{int(time.time() * 1000)}" + + # Publish a message with flow control to ensure the key exists + result = client.message.publish_json( + body={"test": "value"}, + url="https://mock.httpstatus.io/200?sleep=30000", + flow_control=FlowControl( + key=flow_control_key, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + assert result.message_id + + # Pause the flow control key + client.flow_control.pause(flow_control_key) + + # Verify it's paused + paused = client.flow_control.get(flow_control_key) + assert paused.is_paused is True + + # Resume the flow control key + client.flow_control.resume(flow_control_key) + + # Verify it's resumed + resumed = client.flow_control.get(flow_control_key) + assert resumed.is_paused is False + + # Clean up + client.message.cancel(result.message_id) + + +def test_flow_control_pin_unpin(client: QStash) -> None: + flow_control_key = f"fc-pin-{int(time.time() * 1000)}" + + # Publish a message with flow control to ensure the key exists + result = client.message.publish_json( + body={"test": "value"}, + url="https://mock.httpstatus.io/200?sleep=30000", + flow_control=FlowControl( + key=flow_control_key, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + assert result.message_id + + # Pin the configuration + client.flow_control.pin( + flow_control_key, + {"parallelism": 3, "rate": 20, "period": 120}, + ) + + # Verify it's pinned + pinned = client.flow_control.get(flow_control_key) + assert pinned.is_pinned_parallelism is True + assert pinned.is_pinned_rate is True + assert pinned.parallelism_max == 3 + assert pinned.rate_max == 20 + assert pinned.rate_period == 120 + + # Unpin the configuration + client.flow_control.unpin( + flow_control_key, + {"parallelism": True, "rate": True}, + ) + + # Verify it's unpinned + unpinned = client.flow_control.get(flow_control_key) + assert unpinned.is_pinned_parallelism is False + assert unpinned.is_pinned_rate is False + + # Clean up + client.message.cancel(result.message_id) + + +def test_flow_control_unpin_single(client: QStash) -> None: + flow_control_key = f"fc-unpin-single-{int(time.time() * 1000)}" + + result = client.message.publish_json( + body={"test": "value"}, + url="https://mock.httpstatus.io/200?sleep=30000", + flow_control=FlowControl( + key=flow_control_key, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + + # Pin both parallelism and rate + client.flow_control.pin( + flow_control_key, + {"parallelism": 3, "rate": 20, "period": 120}, + ) + + pinned = client.flow_control.get(flow_control_key) + assert pinned.is_pinned_parallelism is True + assert pinned.is_pinned_rate is True + + # Unpin only parallelism + client.flow_control.unpin(flow_control_key, {"parallelism": True}) + + partial = client.flow_control.get(flow_control_key) + assert partial.is_pinned_parallelism is False + assert partial.is_pinned_rate is True + + # Clean up + client.message.cancel(result.message_id) + + +def test_flow_control_reset_rate(client: QStash) -> None: + flow_control_key = f"fc-reset-{int(time.time() * 1000)}" + + # Publish a message with flow control to ensure the key exists + result = client.message.publish_json( + body={"test": "value"}, + url="https://mock.httpstatus.io/200?sleep=30000", + flow_control=FlowControl( + key=flow_control_key, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + assert result.message_id + + # Reset the rate + client.flow_control.reset_rate(flow_control_key) + + # Verify rate was reset by checking the flow control info + info = client.flow_control.get(flow_control_key) + assert info.rate_count == 0 + + # Clean up + client.message.cancel(result.message_id) diff --git a/tests/test_message.py b/tests/test_message.py index fc869be..2df34a8 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -317,14 +317,14 @@ def test_timeout(client: QStash) -> None: def test_cancel_many(client: QStash) -> None: res0 = client.message.publish( - url="http://httpstat.us/404", + url="https://mock.httpstatus.io/404", retries=3, ) assert isinstance(res0, PublishResponse) res1 = client.message.publish( - url="http://httpstat.us/404", + url="https://mock.httpstatus.io/404", retries=3, ) @@ -337,14 +337,14 @@ def test_cancel_many(client: QStash) -> None: def test_cancel_all(client: QStash) -> None: res0 = client.message.publish( - url="http://httpstat.us/404", + url="https://mock.httpstatus.io/404", retries=3, ) assert isinstance(res0, PublishResponse) res1 = client.message.publish( - url="http://httpstat.us/404", + url="https://mock.httpstatus.io/404", retries=3, )