Skip to content

Make InMemoryConnector Thread-Safe for Multi-Threaded FastAPI Use Cases #1320

@romainrey

Description

@romainrey

Description

When using procrastinate.testing.InMemoryConnector with FastAPI and synchronous routes, an issue arises due to thread-safety violations in asyncio operations. The in-memory connector, while useful for testing, assumes that all operations occur on the same event loop. However, when used in a FastAPI sync route, which executes in a separate worker thread, the connector’s notification system (_notify) tries to interact with an asyncio event loop running in a different thread, leading to a RuntimeError.

Context: Using Procrastinate with FastAPI Sync Routes

  • FastAPI sync routes are executed in a thread pool by default.
  • procrastinate.App and its worker are typically running on the main event loop.
  • When a sync route defers a task (task.defer()), procrastinate processes it asynchronously.
  • The InMemoryConnector calls _notify, which interacts with an asyncio.Event from the wrong thread.
  • Since uvloop (default event loop for FastAPI) is not thread-safe, this leads to a non-thread-safe operation error.

Error Message

When deferring a task from a synchronous route, the following error occurs:
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Full traceback:

File "/path/to/procrastinate/testing.py", line 195, in _notify
    await self.on_notification(
File "/path/to/procrastinate/worker.py", line 379, in _handle_notification
    self._new_job_event.set()
File "uvloop/loop.pyx", line 1279, in uvloop.loop.Loop.call_soon
File "uvloop/loop.pyx", line 715, in uvloop.loop.Loop._check_thread
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Cause

  1. The sync route runs in a separate thread from the main event loop.
  2. When task.defer() is called, it eventually triggers _notify in InMemoryConnector.
  3. _notify calls an async function from a different thread than the main event loop.
  4. uvloop (default FastAPI event loop) does not allow thread-unsafe operations.
  5. The RuntimeError occurs when _notify tries to await an event notification from the wrong thread.

Proposed Fix

I propose a thread-safe version of InMemoryConnector that:

  • Saves the event loop and thread ID at startup.
  • Ensures all notifications run on the correct event loop.
  • Uses asyncio.run_coroutine_threadsafe when necessary to safely execute _notify.
import asyncio
import json
import threading
import procrastinate
from procrastinate import testing


class InMemoryConnectorThreadSafe(testing.InMemoryConnector):
    async def open_async(
        self, pool: procrastinate.connector.Pool | None = None
    ) -> None:
        """
        Save the current event loop and its thread id so that later notifications
        can be scheduled on this loop.
        """
        self._loop = asyncio.get_running_loop()
        self._loop_thread_id = threading.get_ident()
        self.states.append("open_async")

    async def _notify(
        self, queue_name: str, notification: procrastinate.jobs.Notification
    ) -> None:
        """
        Instead of directly awaiting on_notification, we check the current thread.
        If we’re not on the same thread as the one where the loop was saved,
        we schedule the notification on the correct loop.
        """
        if not self.on_notification:
            return

        destination_channels = {
            "procrastinate_any_queue_v1",
            f"procrastinate_queue_v1#{queue_name}",
        }
        for channel in set(self.notify_channels).intersection(destination_channels):
            coro = self.on_notification(
                channel=channel, payload=json.dumps(notification)
            )
            if threading.get_ident() == self._loop_thread_id:
                # Already on the right thread: just await.
                await coro
            else:
                # Not on the correct thread: schedule the coroutine on the saved loop.
                future = asyncio.run_coroutine_threadsafe(coro, self._loop)
                # Wrap the concurrent.futures.Future so we can await it.
                await asyncio.wrap_future(future)

Proposed Integration

This could be merged into InMemoryConnector to improve its usability with multi-threaded FastAPI applications.
Suggested changes:
• Modify InMemoryConnector to include the _notify fix.
• Ensure proper testing for multi-threaded environments.

Why This Fix?

✅ Prevents non-thread-safe calls to the event loop.
✅ Makes InMemoryConnector compatible with FastAPI sync routes.
✅ Keeps the fix lightweight, only modifying the _notify behavior.
✅ Does not affect the existing usage of InMemoryConnector in single-threaded tests.

Final Thoughts

Thanks to the Procrastinate team for the awesome work on the library! This fix would make InMemoryConnector much more robust for testing with FastAPI, particularly for teams that mix sync and async routes.
Would love to hear your thoughts on integrating this fix! 🚀

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions