diff --git a/qstash/asyncio/client.py b/qstash/asyncio/client.py index b830d15..19adffe 100644 --- a/qstash/asyncio/client.py +++ b/qstash/asyncio/client.py @@ -2,6 +2,7 @@ from typing import Literal, Optional, Union from qstash.asyncio.dlq import AsyncDlqApi +from qstash.asyncio.flow_control import AsyncFlowControlApi from qstash.asyncio.log import AsyncLogApi from qstash.asyncio.http import AsyncHttpClient from qstash.asyncio.message import AsyncMessageApi @@ -49,3 +50,6 @@ def __init__( self.dlq = AsyncDlqApi(self.http) """Dlq (Dead Letter Queue) api.""" + + self.flow_control = AsyncFlowControlApi(self.http) + """Flow control api.""" diff --git a/qstash/asyncio/flow_control.py b/qstash/asyncio/flow_control.py new file mode 100644 index 0000000..1cf9468 --- /dev/null +++ b/qstash/asyncio/flow_control.py @@ -0,0 +1,38 @@ +from qstash.asyncio.http import AsyncHttpClient +from qstash.flow_control_api import ( + FlowControlInfo, + GlobalParallelismInfo, + parse_flow_control_info, +) + + +class AsyncFlowControlApi: + def __init__(self, http: AsyncHttpClient) -> None: + self._http = http + + async def get(self, flow_control_key: str) -> FlowControlInfo: + """ + Gets a single flow control by key. + + :param flow_control_key: The flow control key to get. + """ + response = await self._http.request( + path=f"/v2/flowControl/{flow_control_key}", + method="GET", + ) + + return parse_flow_control_info(response) + + async def get_global_parallelism(self) -> GlobalParallelismInfo: + """ + Gets the global parallelism info. + """ + response = await self._http.request( + path="/v2/globalParallelism", + method="GET", + ) + + return GlobalParallelismInfo( + parallelism_max=response.get("parallelismMax", 0), + parallelism_count=response.get("parallelismCount", 0), + ) diff --git a/qstash/client.py b/qstash/client.py index 72fe4e5..a6758b0 100644 --- a/qstash/client.py +++ b/qstash/client.py @@ -2,6 +2,7 @@ from typing import Optional, Union, Literal from qstash.dlq import DlqApi +from qstash.flow_control_api import FlowControlApi from qstash.log import LogApi from qstash.http import RetryConfig, HttpClient from qstash.message import MessageApi @@ -50,3 +51,6 @@ def __init__( self.dlq = DlqApi(self.http) """Dlq (Dead Letter Queue) api.""" + + self.flow_control = FlowControlApi(self.http) + """Flow control api.""" diff --git a/qstash/flow_control_api.py b/qstash/flow_control_api.py new file mode 100644 index 0000000..83e0172 --- /dev/null +++ b/qstash/flow_control_api.py @@ -0,0 +1,89 @@ +import dataclasses +from typing import Any, Dict + +from qstash.http import HttpClient + + +@dataclasses.dataclass +class FlowControlInfo: + """Information about a flow control key.""" + + key: str + """The flow control key.""" + + wait_list_size: int + """The number of messages waiting in the wait list.""" + + parallelism_max: int + """The maximum parallelism configured for this flow control key.""" + + parallelism_count: int + """The current number of active requests for this flow control key.""" + + rate_max: int + """The maximum rate configured for this flow control key.""" + + rate_count: int + """The current number of requests consumed in the current period.""" + + rate_period: int + """The rate period in seconds.""" + + rate_period_start: int + """The start time of the current rate period as a unix timestamp.""" + + +@dataclasses.dataclass +class GlobalParallelismInfo: + """Information about global parallelism.""" + + parallelism_max: int + """The maximum global parallelism.""" + + parallelism_count: int + """The current number of active requests globally.""" + + +def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo: + return FlowControlInfo( + key=response["flowControlKey"], + wait_list_size=response.get("waitListSize", 0), + parallelism_max=response.get("parallelismMax", 0), + parallelism_count=response.get("parallelismCount", 0), + rate_max=response.get("rateMax", 0), + rate_count=response.get("rateCount", 0), + rate_period=response.get("ratePeriod", 0), + rate_period_start=response.get("ratePeriodStart", 0), + ) + + +class FlowControlApi: + def __init__(self, http: HttpClient) -> None: + self._http = http + + def get(self, flow_control_key: str) -> FlowControlInfo: + """ + Gets a single flow control by key. + + :param flow_control_key: The flow control key to get. + """ + response = self._http.request( + path=f"/v2/flowControl/{flow_control_key}", + method="GET", + ) + + return parse_flow_control_info(response) + + def get_global_parallelism(self) -> GlobalParallelismInfo: + """ + Gets the global parallelism info. + """ + response = self._http.request( + path="/v2/globalParallelism", + method="GET", + ) + + return GlobalParallelismInfo( + parallelism_max=response.get("parallelismMax", 0), + parallelism_count=response.get("parallelismCount", 0), + ) diff --git a/qstash/log.py b/qstash/log.py index 5972831..dfd0b30 100644 --- a/qstash/log.py +++ b/qstash/log.py @@ -246,7 +246,7 @@ def parse_logs_response(response: List[Dict[str, Any]]) -> List[Log]: flow_control=flow_control, method=event.get("method"), max_retries=event.get("maxRetries"), - retry_delay_expression=event.get("retryDelayExpression"), + retry_delay_expression=event.get("retryDelayExpr"), label=event.get("label"), ) ) @@ -285,6 +285,8 @@ def list( params=params, ) + print(response) + logs = parse_logs_response(response["events"]) return ListLogsResponse( diff --git a/tests/asyncio/test_flow_control.py b/tests/asyncio/test_flow_control.py new file mode 100644 index 0000000..ec0fb6f --- /dev/null +++ b/tests/asyncio/test_flow_control.py @@ -0,0 +1,50 @@ +import asyncio + +import pytest + +from qstash import AsyncQStash +from qstash.flow_control_api import GlobalParallelismInfo +from qstash.message import FlowControl, PublishResponse + + +FLOW_CONTROL_KEY = "test-flow-control-key-async" + + +@pytest.mark.asyncio +async def test_flow_control_get_async(async_client: AsyncQStash) -> None: + # Publish a message with flow control to ensure the key exists + result = await async_client.message.publish_json( + body={"test": "value"}, + url="https://httpstat.us/200?sleep=30000", + flow_control=FlowControl( + key=FLOW_CONTROL_KEY, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + assert result.message_id + + # Small delay to let flow control state propagate + await asyncio.sleep(1) + + # Get a single flow control by key + single = await async_client.flow_control.get(FLOW_CONTROL_KEY) + assert single.key == FLOW_CONTROL_KEY + assert isinstance(single.wait_list_size, int) + assert isinstance(single.parallelism_max, int) + assert isinstance(single.parallelism_count, int) + + # Clean up message + await async_client.message.cancel(result.message_id) + + +@pytest.mark.asyncio +async def test_flow_control_get_global_parallelism_async( + async_client: AsyncQStash, +) -> None: + info = await async_client.flow_control.get_global_parallelism() + assert isinstance(info, GlobalParallelismInfo) + assert isinstance(info.parallelism_max, int) + assert isinstance(info.parallelism_count, int) diff --git a/tests/asyncio/test_message.py b/tests/asyncio/test_message.py index 8930178..7de8787 100644 --- a/tests/asyncio/test_message.py +++ b/tests/asyncio/test_message.py @@ -288,6 +288,7 @@ async def test_publish_to_url_group_async(async_client: AsyncQStash) -> None: ) res = await async_client.message.publish( + method="GET", body="test-body", url_group=name, ) @@ -385,10 +386,10 @@ async def test_publish_to_api_llm_custom_provider_async( @pytest.mark.asyncio async def test_enqueue_api_llm_custom_provider_async( async_client: AsyncQStash, - cleanup_queue: Callable[[AsyncQStash, str], None], + cleanup_queue_async: Callable[[AsyncQStash, str], None], ) -> None: name = "test_queue" - cleanup_queue(async_client, name) + cleanup_queue_async(async_client, name) res = await async_client.message.enqueue_json( queue=name, diff --git a/tests/test_flow_control.py b/tests/test_flow_control.py new file mode 100644 index 0000000..2f79e30 --- /dev/null +++ b/tests/test_flow_control.py @@ -0,0 +1,44 @@ +import time + +from qstash import QStash +from qstash.flow_control_api import GlobalParallelismInfo +from qstash.message import FlowControl, PublishResponse + + +FLOW_CONTROL_KEY = "test-flow-control-key" + + +def test_flow_control_get(client: QStash) -> None: + # Publish a message with flow control to ensure the key exists + result = client.message.publish_json( + body={"test": "value"}, + url="https://httpstat.us/200?sleep=30000", + flow_control=FlowControl( + key=FLOW_CONTROL_KEY, + parallelism=5, + rate=10, + period="1m", + ), + ) + assert isinstance(result, PublishResponse) + assert result.message_id + + # Small delay to let flow control state propagate + time.sleep(1) + + # Get a single flow control by key + single = client.flow_control.get(FLOW_CONTROL_KEY) + assert single.key == FLOW_CONTROL_KEY + assert isinstance(single.wait_list_size, int) + assert isinstance(single.parallelism_max, int) + assert isinstance(single.parallelism_count, int) + + # Clean up message + client.message.cancel(result.message_id) + + +def test_flow_control_get_global_parallelism(client: QStash) -> None: + info = client.flow_control.get_global_parallelism() + assert isinstance(info, GlobalParallelismInfo) + assert isinstance(info.parallelism_max, int) + assert isinstance(info.parallelism_count, int)