Variable rate metric, or limit by request payload size instead of number of requests #181
-
Hi, I've been using this library a lot to limit asyncio gather api calls, which works flawlessly. Now, I actually have a use case where the rate limit is not api calls vs time, but rather size of the accumulated payloads of the api calls vs time. More specifically, GPT api calls. However, the rate limit is in tokens per minute and not requests per minute. I'd love to implement it here, or in a fork, but wanted to hear your thoughts first. Cheers, |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
# SPDX-License-Identifier: MIT
# Copyright (c) 2019 Martijn Pieters
# Licensed under the MIT license as detailed in LICENSE.txt
import asyncio
from contextlib import asynccontextmanager
from types import TracebackType
from typing import Dict, Optional, Type
class AsyncLimiter:
"""A leaky bucket rate limiter.
This is an :ref:`asynchronous context manager <async-context-managers>`;
when used with :keyword:`async with`, entering the context acquires
capacity::
limiter = AsyncLimiter(10)
for foo in bar:
async with limiter:
# process foo elements at 10 items per minute
:param max_rate: Allow up to `max_rate` / `time_period` acquisitions before
blocking.
:param time_period: duration, in seconds, of the time period in which to
limit the rate. Note that up to `max_rate` acquisitions are allowed
within this time period in a burst.
"""
__slots__ = (
"max_rate",
"time_period",
"_rate_per_sec",
"_level",
"_last_check",
"_waiters",
)
max_rate: float #: The configured `max_rate` value for this limiter.
time_period: float #: The configured `time_period` value for this limiter.
def __init__(self, max_rate: float, time_period: float = 60) -> None:
self.max_rate = max_rate
self.time_period = time_period
self._rate_per_sec = max_rate / time_period
self._level = 0.0
self._last_check = 0.0
# queue of waiting futures to signal capacity to
self._waiters: Dict[asyncio.Task, asyncio.Future] = {}
def _leak(self) -> None:
"""Drip out capacity from the bucket."""
loop = asyncio.get_running_loop()
if self._level:
# drip out enough level for the elapsed time since
# we last checked
elapsed = loop.time() - self._last_check
decrement = elapsed * self._rate_per_sec
self._level = max(self._level - decrement, 0)
self._last_check = loop.time()
def has_capacity(self, amount: float = 1) -> bool:
"""Check if there is enough capacity remaining in the limiter
:param amount: How much capacity you need to be available.
"""
self._leak()
requested = self._level + amount
# if there are tasks waiting for capacity, signal to the first
# there there may be some now (they won't wake up until this task
# yields with an await)
if requested < self.max_rate:
for fut in self._waiters.values():
if not fut.done():
fut.set_result(True)
break
return self._level + amount <= self.max_rate
async def acquire(self, amount: float = 1) -> None:
"""Acquire capacity in the limiter.
If the limit has been reached, blocks until enough capacity has been
freed before returning.
:param amount: How much capacity you need to be available.
:exception: Raises :exc:`ValueError` if `amount` is greater than
:attr:`max_rate`.
"""
if amount > self.max_rate:
raise ValueError("Can't acquire more than the maximum capacity")
loop = asyncio.get_running_loop()
task = asyncio.current_task(loop)
assert task is not None
while not self.has_capacity(amount):
# wait for the next drip to have left the bucket
# add a future to the _waiters map to be notified
# 'early' if capacity has come up
fut = loop.create_future()
self._waiters[task] = fut
try:
await asyncio.wait_for(
asyncio.shield(fut), 1 / self._rate_per_sec * amount
)
except asyncio.TimeoutError:
pass
fut.cancel()
self._waiters.pop(task, None)
self._level += amount
return None
@asynccontextmanager
async def limit(self, amount: float = 1) -> None:
"""
An asynchronous context manager that acquires the given amount of capacity.
:param amount: The amount of capacity to acquire.
"""
await self.acquire(amount)
yield
async def task(id: int, limiter: AsyncLimiter, payload_size: float):
"""Simulate a task that requires rate limiting with variable payload."""
async with limiter.limit(payload_size):
print(f"Task {id} with payload {payload_size} is running.")
await asyncio.sleep(1) # Simulate an I/O-bound task.
async def main():
"""Main function to run multiple tasks with rate limiting based on payload size."""
limiter = AsyncLimiter(max_rate=10, time_period=10)
tasks = [task(i, limiter, payload_size=(i+1)*0.5) for i in range(10)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
-
You can rate limit any amount per minute, just acquire a different amount by awaiting on async def task(id: int, limiter: AsyncLimiter, payload_size: float):
"""Simulate a task that requires rate limiting with variable payload."""
await limiter.acquire(payload_size)
print(f"Task {id} with payload {payload_size} is running.")
await asyncio.sleep(1) # Simulate an I/O-bound task. The If you really must have a context manager for your use-case, just create a simple utility context manager; it doesn't have to be a method on the class at all: import typing as t
from contextlib import asynccontextmanager
@asynccontextmanager
async def limit(limiter:AsyncLimiter, amount: float) -> t.AnyncIterator[AsyncLimiter]:
await limiter.acquire(amount)
yield limiter and use it as async with limit(limiter, payload_size):
print(f"Task {id} with payload {payload_size} is running.")
await asyncio.sleep(1) # Simulate an I/O-bound task. Finally, take into account that there are two rate limits when accessing the OpenAPI API; the token count and a requests per minute limit. You'll have to account for both. |
Beta Was this translation helpful? Give feedback.
You can rate limit any amount per minute, just acquire a different amount by awaiting on
limiter.acquire(<amount>)
The
async with limiter:
syntax is just syntactic sugar forawait limiter.aquire(1)
here.If you really must have a context manager for your use-case, just create a simple utility context manager; it doesn't have to be a method on the class at all: