Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions qstash/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Literal, Optional, Union

from qstash.asyncio.dlq import AsyncDlqApi
from qstash.asyncio.flow_control import AsyncFlowControlApi
from qstash.asyncio.log import AsyncLogApi
from qstash.asyncio.http import AsyncHttpClient
from qstash.asyncio.message import AsyncMessageApi
Expand Down Expand Up @@ -49,3 +50,6 @@ def __init__(

self.dlq = AsyncDlqApi(self.http)
"""Dlq (Dead Letter Queue) api."""

self.flow_control = AsyncFlowControlApi(self.http)
"""Flow control api."""
38 changes: 38 additions & 0 deletions qstash/asyncio/flow_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from qstash.asyncio.http import AsyncHttpClient
from qstash.flow_control_api import (
FlowControlInfo,
GlobalParallelismInfo,
parse_flow_control_info,
)


class AsyncFlowControlApi:
def __init__(self, http: AsyncHttpClient) -> None:
self._http = http

async def get(self, flow_control_key: str) -> FlowControlInfo:
"""
Gets a single flow control by key.

:param flow_control_key: The flow control key to get.
"""
response = await self._http.request(
path=f"/v2/flowControl/{flow_control_key}",
method="GET",
)

return parse_flow_control_info(response)

async def get_global_parallelism(self) -> GlobalParallelismInfo:
"""
Gets the global parallelism info.
"""
response = await self._http.request(
path="/v2/globalParallelism",
method="GET",
)

return GlobalParallelismInfo(
parallelism_max=response.get("parallelismMax", 0),
parallelism_count=response.get("parallelismCount", 0),
)
4 changes: 4 additions & 0 deletions qstash/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional, Union, Literal

from qstash.dlq import DlqApi
from qstash.flow_control_api import FlowControlApi
from qstash.log import LogApi
from qstash.http import RetryConfig, HttpClient
from qstash.message import MessageApi
Expand Down Expand Up @@ -50,3 +51,6 @@ def __init__(

self.dlq = DlqApi(self.http)
"""Dlq (Dead Letter Queue) api."""

self.flow_control = FlowControlApi(self.http)
"""Flow control api."""
89 changes: 89 additions & 0 deletions qstash/flow_control_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import dataclasses
from typing import Any, Dict

from qstash.http import HttpClient


@dataclasses.dataclass
class FlowControlInfo:
"""Information about a flow control key."""

key: str
"""The flow control key."""

wait_list_size: int
"""The number of messages waiting in the wait list."""

parallelism_max: int
"""The maximum parallelism configured for this flow control key."""

parallelism_count: int
"""The current number of active requests for this flow control key."""

rate_max: int
"""The maximum rate configured for this flow control key."""

rate_count: int
"""The current number of requests consumed in the current period."""

rate_period: int
"""The rate period in seconds."""

rate_period_start: int
"""The start time of the current rate period as a unix timestamp."""


@dataclasses.dataclass
class GlobalParallelismInfo:
"""Information about global parallelism."""

parallelism_max: int
"""The maximum global parallelism."""

parallelism_count: int
"""The current number of active requests globally."""


def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo:
return FlowControlInfo(
key=response["flowControlKey"],
wait_list_size=response.get("waitListSize", 0),
parallelism_max=response.get("parallelismMax", 0),
parallelism_count=response.get("parallelismCount", 0),
rate_max=response.get("rateMax", 0),
rate_count=response.get("rateCount", 0),
rate_period=response.get("ratePeriod", 0),
rate_period_start=response.get("ratePeriodStart", 0),
)


class FlowControlApi:
def __init__(self, http: HttpClient) -> None:
self._http = http

def get(self, flow_control_key: str) -> FlowControlInfo:
"""
Gets a single flow control by key.

:param flow_control_key: The flow control key to get.
"""
response = self._http.request(
path=f"/v2/flowControl/{flow_control_key}",
method="GET",
)

return parse_flow_control_info(response)

def get_global_parallelism(self) -> GlobalParallelismInfo:
"""
Gets the global parallelism info.
"""
response = self._http.request(
path="/v2/globalParallelism",
method="GET",
)

return GlobalParallelismInfo(
parallelism_max=response.get("parallelismMax", 0),
parallelism_count=response.get("parallelismCount", 0),
)
4 changes: 3 additions & 1 deletion qstash/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def parse_logs_response(response: List[Dict[str, Any]]) -> List[Log]:
flow_control=flow_control,
method=event.get("method"),
max_retries=event.get("maxRetries"),
retry_delay_expression=event.get("retryDelayExpression"),
retry_delay_expression=event.get("retryDelayExpr"),
label=event.get("label"),
)
)
Expand Down Expand Up @@ -285,6 +285,8 @@ def list(
params=params,
)

print(response)

logs = parse_logs_response(response["events"])

return ListLogsResponse(
Expand Down
50 changes: 50 additions & 0 deletions tests/asyncio/test_flow_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio

import pytest

from qstash import AsyncQStash
from qstash.flow_control_api import GlobalParallelismInfo
from qstash.message import FlowControl, PublishResponse


FLOW_CONTROL_KEY = "test-flow-control-key-async"


@pytest.mark.asyncio
async def test_flow_control_get_async(async_client: AsyncQStash) -> None:
# Publish a message with flow control to ensure the key exists
result = await async_client.message.publish_json(
body={"test": "value"},
url="https://httpstat.us/200?sleep=30000",
flow_control=FlowControl(
key=FLOW_CONTROL_KEY,
parallelism=5,
rate=10,
period="1m",
),
)
assert isinstance(result, PublishResponse)
assert result.message_id

# Small delay to let flow control state propagate
await asyncio.sleep(1)

# Get a single flow control by key
single = await async_client.flow_control.get(FLOW_CONTROL_KEY)
assert single.key == FLOW_CONTROL_KEY
assert isinstance(single.wait_list_size, int)
assert isinstance(single.parallelism_max, int)
assert isinstance(single.parallelism_count, int)

# Clean up message
await async_client.message.cancel(result.message_id)


@pytest.mark.asyncio
async def test_flow_control_get_global_parallelism_async(
async_client: AsyncQStash,
) -> None:
info = await async_client.flow_control.get_global_parallelism()
assert isinstance(info, GlobalParallelismInfo)
assert isinstance(info.parallelism_max, int)
assert isinstance(info.parallelism_count, int)
5 changes: 3 additions & 2 deletions tests/asyncio/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ async def test_publish_to_url_group_async(async_client: AsyncQStash) -> None:
)

res = await async_client.message.publish(
method="GET",
body="test-body",
url_group=name,
)
Expand Down Expand Up @@ -385,10 +386,10 @@ async def test_publish_to_api_llm_custom_provider_async(
@pytest.mark.asyncio
async def test_enqueue_api_llm_custom_provider_async(
async_client: AsyncQStash,
cleanup_queue: Callable[[AsyncQStash, str], None],
cleanup_queue_async: Callable[[AsyncQStash, str], None],
) -> None:
name = "test_queue"
cleanup_queue(async_client, name)
cleanup_queue_async(async_client, name)

res = await async_client.message.enqueue_json(
queue=name,
Expand Down
44 changes: 44 additions & 0 deletions tests/test_flow_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import time

from qstash import QStash
from qstash.flow_control_api import GlobalParallelismInfo
from qstash.message import FlowControl, PublishResponse


FLOW_CONTROL_KEY = "test-flow-control-key"


def test_flow_control_get(client: QStash) -> None:
# Publish a message with flow control to ensure the key exists
result = client.message.publish_json(
body={"test": "value"},
url="https://httpstat.us/200?sleep=30000",
flow_control=FlowControl(
key=FLOW_CONTROL_KEY,
parallelism=5,
rate=10,
period="1m",
),
)
assert isinstance(result, PublishResponse)
assert result.message_id

# Small delay to let flow control state propagate
time.sleep(1)

# Get a single flow control by key
single = client.flow_control.get(FLOW_CONTROL_KEY)
assert single.key == FLOW_CONTROL_KEY
assert isinstance(single.wait_list_size, int)
assert isinstance(single.parallelism_max, int)
assert isinstance(single.parallelism_count, int)

# Clean up message
client.message.cancel(result.message_id)


def test_flow_control_get_global_parallelism(client: QStash) -> None:
info = client.flow_control.get_global_parallelism()
assert isinstance(info, GlobalParallelismInfo)
assert isinstance(info.parallelism_max, int)
assert isinstance(info.parallelism_count, int)