Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/mopidy_mpd/actor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import logging
import threading
from typing import Any

import pykka
Expand Down Expand Up @@ -37,7 +39,11 @@ def __init__(self, config: types.Config, core: CoreProxy) -> None:
self.zeroconf_service = None

self.uri_map = uri_mapper.MpdUriMapper(core)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is setting the loop globally then it is a no go, we can't have random extensions fighting over who controls this, in that case it would need to be mopidy itself that owns and starts the loop for all extensions.

Copy link
Author

@blacklight blacklight Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adamcik I've opened mopidy/mopidy#2197 on mopidy to initialize the loop when the application starts.

Btw there was already a new_event_loop/set_event_loop call in the http module (needed for Tornado I guess).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment in the http case explains it's per thread. Presumably that's right and what you had was ok but I've not checked their docs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be the case indeed. Just done a quick test:

import asyncio
import threading


async def my_coroutine():
    print("Coroutine:", id(threading.current_thread()))
    await asyncio.sleep(1)


def my_thread():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(my_coroutine())
    print(f"Closing loop {id(loop)} from {id(threading.current_thread())}")
    loop.close()


def main():
    print("Main:", id(threading.current_thread()))
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    thread = threading.Thread(target=my_thread)
    thread.start()
    loop.run_until_complete(my_coroutine())
    thread.join()

    print(f"Closing loop {id(loop)} from {id(threading.current_thread())}")
    loop.close()


if __name__ == "__main__":
    main()

Output:

Main: 135176582243936
Coroutine: 135176582243936
Coroutine: 135176571626432
Closing loop 135176571884624 from 135176571626432
Closing loop 135176571625424 from 135176582243936

So calls to asyncio.new_event_loop performed in different threads result in the creation of different loops, and as long as set_event_loop is called at most once in each thread things should work.

So as long as we expect new_event_loop+set_event_loop to be called at most once per pykka.ThreadedActor is it fine to let extensions manage their own loops?

If so I can close the PR on mopidy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly didn't remember if this is process or thread wide, so the comment was meant as a question not a statement of fact.

So if this actually thread wide just following the same pattern as the HTTP extension and creating our own thread for the loop could make sense. Then each exentsion can have thread dedicated to running it's own asyncio loop and dispatching to/from pykka? The downside is of course having a thread per loop, but given how many threads we already have floating around for pykka I doubt this makes a large difference in practive.

The other alternative is to have a sentral thread in mopidy that starts when an exentsion (or something else in mopidy) requests for something to run in an event loop, or requests the loop. The pro of this would be having a single loop and fever threads, and extensions have less things to manage and get right. While the negative would be that if anyone accidentaly does blocking work in something they run on the loop it blocks everyone. If each extension has it's own loop we have more isolation.

I have not idea which one would actually be nicer or better for Mopidy's needs, but those are the two variants and some of the tradeoffs that came to mind off the top of my head.


self.server = self._setup_server(config, core)
self.server_thread = threading.Thread(target=self._run_server)

def _setup_server(self, config: types.Config, core: CoreProxy) -> network.Server:
try:
Expand All @@ -54,11 +60,16 @@ def _setup_server(self, config: types.Config, core: CoreProxy) -> network.Server
except OSError as exc:
raise exceptions.FrontendError(f"MPD server startup failed: {exc}") from exc

logger.info(f"MPD server running at {network.format_address(server.address)}")
logger.info("MPD server running at %s", network.format_address(server.address))

return server

def _run_server(self) -> None:
self.loop.run_until_complete(self.server.run())

def on_start(self) -> None:
self.server_thread.start()

if self.zeroconf_name and not network.is_unix_socket(self.server.server_socket):
self.zeroconf_service = zeroconf.Zeroconf(
name=self.zeroconf_name, stype="_mpd._tcp", port=self.port
Expand All @@ -73,7 +84,13 @@ def on_stop(self) -> None:
for session_actor in session_actors:
session_actor.stop()

self.server.stop()
if not self.server_thread.is_alive():
logger.warning("MPD server already stopped")
return

self.loop.call_soon_threadsafe(self.server.stop)
logger.debug("Waiting for MPD server thread to terminate")
self.server_thread.join()

def on_event(self, event: str, **kwargs: Any) -> None:
if event not in _CORE_EVENTS_TO_IDLE_SUBSYSTEMS:
Expand Down
6 changes: 5 additions & 1 deletion src/mopidy_mpd/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
from collections.abc import Callable
from typing import (
Expand Down Expand Up @@ -100,7 +101,10 @@ def handle_idle(self, subsystem: str) -> None:
response = [*[f"changed: {s}" for s in subsystems], "OK"]
self.subsystem_events = set()
self.subsystem_subscriptions = set()
self.session.send_lines(response)
asyncio.run_coroutine_threadsafe(
self.session.send_lines(response),
self.session.loop,
)

def _call_next_filter(
self, request: Request, response: Response, filter_chain: list[Filter]
Expand Down
Loading