Skip to content

@cached does not coalesce concurrent calls when using Semaphore which leads to cache stampede in first batch. #1041

@0x1618

Description

@0x1618

When using @cached with a concurrency limit via asyncio.Semaphore, the first batch of tasks all miss the cache and execute the underlying function in parallel. This results in multiple uncached executions instead of a single in-flight call being shared.

import asyncio
import random
from aiocache import cached, caches

caches.set_config({
    "default": {
        "cache": "aiocache.SimpleMemoryCache",
        "ttl": 300
    }
})

async def with_semaphore(semaphore, coro):
    async with semaphore:
        return await coro

@cached()
async def long_job():
    await asyncio.sleep(2)
    return random.randint(0, 1000)

async def main():
    s = asyncio.Semaphore(4)
    tasks = [with_semaphore(s, long_job()) for _ in range(8)]
    print(await asyncio.gather(*tasks))

asyncio.run(main())

output:

[177, 972, 52, 418, 418, 418, 418, 418]

The first caller (within the semaphore limit) should execute the function, and all other concurrent callers should await the same in-flight result, not trigger additional executions.

fix:

from functools import wraps

def _default_key_builder(func, *args, **kwargs):
    return f"{func.__module__}:{func.__qualname__}:{args}:{tuple(sorted(kwargs.items()))}"


def coalesce(key_builder):
    lock: Optional[asyncio.Lock] = None
    inflight: dict[str, asyncio.Task] = {}

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            nonlocal lock

            if lock is None:
                lock = asyncio.Lock()

            key: str = key_builder(func, *args, **kwargs)

            async with lock:
                task: Optional[asyncio.Task] = inflight.get(key)

                if task is None:
                    task = asyncio.create_task(func(*args, **kwargs))

                    inflight[key] = task

            try:
                return await task
            finally:
                if task.done():
                    async with lock:
                        if inflight.get(key) is task:
                            inflight.pop(key, None)

        return wrapper

    return decorator


@coalesce(key_builder=_default_key_builder)
@cached(alias="default", key_builder=_default_key_builder)
async def long_job():
    await asyncio.sleep(2)
    return random.randint(0, 1000)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions