Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "qstash"
version = "3.2.0"
version = "3.3.0"
description = "Python SDK for Upstash QStash"
license = "MIT"
authors = ["Upstash <support@upstash.com>"]
Expand Down
2 changes: 1 addition & 1 deletion qstash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
from qstash.client import QStash
from qstash.receiver import Receiver

__version__ = "3.2.0"
__version__ = "3.3.0"
__all__ = ["QStash", "AsyncQStash", "Receiver"]
96 changes: 96 additions & 0 deletions qstash/asyncio/flow_control.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import Dict

from qstash.asyncio.http import AsyncHttpClient
from qstash.flow_control_api import (
FlowControlInfo,
GlobalParallelismInfo,
PinFlowControlOptions,
UnpinFlowControlOptions,
parse_flow_control_info,
)

Expand Down Expand Up @@ -36,3 +40,95 @@ async def get_global_parallelism(self) -> GlobalParallelismInfo:
parallelism_max=response.get("parallelismMax", 0),
parallelism_count=response.get("parallelismCount", 0),
)

async def pause(self, flow_control_key: str) -> None:
"""
Pause message delivery for a flow-control key.

Messages already in the waitlist will remain there.
New incoming messages will be added directly to the waitlist.

:param flow_control_key: The flow control key to pause.
"""
await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/pause",
method="POST",
parse_response=False,
)

async def resume(self, flow_control_key: str) -> None:
"""
Resume message delivery for a flow-control key.

:param flow_control_key: The flow control key to resume.
"""
await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/resume",
method="POST",
parse_response=False,
)

async def pin(self, flow_control_key: str, options: PinFlowControlOptions) -> None:
"""
Pin a processing configuration for a flow-control key.

While pinned, the system ignores configurations provided by incoming
messages and uses the pinned configuration instead.

:param flow_control_key: The flow control key to pin.
:param options: The configuration to pin.
"""
params: Dict[str, str] = {}
if "parallelism" in options:
params["parallelism"] = str(options["parallelism"])
if "rate" in options:
params["rate"] = str(options["rate"])
if "period" in options:
params["period"] = str(options["period"])

await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/pin",
method="POST",
params=params or None,
parse_response=False,
)

async def unpin(
self, flow_control_key: str, options: UnpinFlowControlOptions
) -> None:
"""
Remove the pinned configuration for a flow-control key.

After unpinning, the system resumes updating the configuration
based on incoming messages.

:param flow_control_key: The flow control key to unpin.
:param options: Which configurations to unpin.
"""
params: Dict[str, str] = {}
if "parallelism" in options:
params["parallelism"] = str(options["parallelism"]).lower()
if "rate" in options:
params["rate"] = str(options["rate"]).lower()

await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/unpin",
method="POST",
params=params or None,
parse_response=False,
)

async def reset_rate(self, flow_control_key: str) -> None:
"""
Reset the rate configuration state for a flow-control key.

Clears the current rate count and immediately ends the current period.
The current timestamp becomes the start of the new rate period.

:param flow_control_key: The flow control key to reset rate for.
"""
await self._http.request(
path=f"/v2/flowControl/{flow_control_key}/resetRate",
method="POST",
parse_response=False,
)
127 changes: 126 additions & 1 deletion qstash/flow_control_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dataclasses
from typing import Any, Dict
from typing import Any, Dict, TypedDict

from qstash.http import HttpClient

Expand Down Expand Up @@ -32,6 +32,15 @@ class FlowControlInfo:
rate_period_start: int
"""The start time of the current rate period as a unix timestamp."""

is_paused: bool
"""Whether message delivery is paused for this flow control key."""

is_pinned_parallelism: bool
"""Whether the parallelism configuration is pinned."""

is_pinned_rate: bool
"""Whether the rate configuration is pinned."""


@dataclasses.dataclass
class GlobalParallelismInfo:
Expand All @@ -44,6 +53,29 @@ class GlobalParallelismInfo:
"""The current number of active requests globally."""


class PinFlowControlOptions(TypedDict, total=False):
"""Options for pinning a flow control key configuration."""

parallelism: int
"""The parallelism value to apply to the flow-control key."""

rate: int
"""The rate value to apply to the flow-control key."""

period: int
"""The period value to apply to the flow-control key, in seconds."""


class UnpinFlowControlOptions(TypedDict, total=False):
"""Options for unpinning a flow control key configuration."""

parallelism: bool
"""Whether to unpin the parallelism configuration."""

rate: bool
"""Whether to unpin the rate configuration."""


def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo:
return FlowControlInfo(
key=response["flowControlKey"],
Expand All @@ -54,6 +86,9 @@ def parse_flow_control_info(response: Dict[str, Any]) -> FlowControlInfo:
rate_count=response.get("rateCount", 0),
rate_period=response.get("ratePeriod", 0),
rate_period_start=response.get("ratePeriodStart", 0),
is_paused=response.get("isPaused", False),
is_pinned_parallelism=response.get("isPinnedParallelism", False),
is_pinned_rate=response.get("isPinnedRate", False),
)


Expand Down Expand Up @@ -87,3 +122,93 @@ def get_global_parallelism(self) -> GlobalParallelismInfo:
parallelism_max=response.get("parallelismMax", 0),
parallelism_count=response.get("parallelismCount", 0),
)

def pause(self, flow_control_key: str) -> None:
"""
Pause message delivery for a flow-control key.

Messages already in the waitlist will remain there.
New incoming messages will be added directly to the waitlist.

:param flow_control_key: The flow control key to pause.
"""
self._http.request(
path=f"/v2/flowControl/{flow_control_key}/pause",
method="POST",
parse_response=False,
)

def resume(self, flow_control_key: str) -> None:
"""
Resume message delivery for a flow-control key.

:param flow_control_key: The flow control key to resume.
"""
self._http.request(
path=f"/v2/flowControl/{flow_control_key}/resume",
method="POST",
parse_response=False,
)

def pin(self, flow_control_key: str, options: PinFlowControlOptions) -> None:
"""
Pin a processing configuration for a flow-control key.

While pinned, the system ignores configurations provided by incoming
messages and uses the pinned configuration instead.

:param flow_control_key: The flow control key to pin.
:param options: The configuration to pin.
"""
params: Dict[str, str] = {}
if "parallelism" in options:
params["parallelism"] = str(options["parallelism"])
if "rate" in options:
params["rate"] = str(options["rate"])
if "period" in options:
params["period"] = str(options["period"])

self._http.request(
path=f"/v2/flowControl/{flow_control_key}/pin",
method="POST",
params=params or None,
parse_response=False,
)

def unpin(self, flow_control_key: str, options: UnpinFlowControlOptions) -> None:
"""
Remove the pinned configuration for a flow-control key.

After unpinning, the system resumes updating the configuration
based on incoming messages.

:param flow_control_key: The flow control key to unpin.
:param options: Which configurations to unpin.
"""
params: Dict[str, str] = {}
if "parallelism" in options:
params["parallelism"] = str(options["parallelism"]).lower()
if "rate" in options:
params["rate"] = str(options["rate"]).lower()

self._http.request(
path=f"/v2/flowControl/{flow_control_key}/unpin",
method="POST",
params=params or None,
parse_response=False,
)

def reset_rate(self, flow_control_key: str) -> None:
"""
Reset the rate configuration state for a flow-control key.

Clears the current rate count and immediately ends the current period.
The current timestamp becomes the start of the new rate period.

:param flow_control_key: The flow control key to reset rate for.
"""
self._http.request(
path=f"/v2/flowControl/{flow_control_key}/resetRate",
method="POST",
parse_response=False,
)
Loading
Loading