Skip to content

Feature/event listeners manager #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
40062a9
Rename PluginManager._registry attribute to more explicit ._action_re…
strohganoff Feb 26, 2025
4013e91
Create EventListenerManager class and EventListener base class, along…
strohganoff Mar 18, 2025
dd9e210
Update event models to validate events field and provide constant lis…
strohganoff Mar 18, 2025
6afeff2
Update WebSocketClient class to inherit from EventListener
strohganoff Mar 18, 2025
7ff7504
Clean up tests, including consolidating Action & GlobalAction test mo…
strohganoff Mar 18, 2025
351aa7c
Modify EventNameStr TypeAlias to alias just a str type to be more inc…
strohganoff Mar 18, 2025
f3eb5d9
Create EventAdapter class to wrap pydantic.TypeAdapter, and allows dy…
strohganoff Mar 18, 2025
9033992
Push code for pulling messages from WebSocketClient in private method…
strohganoff Mar 18, 2025
28f1806
Integrate EventListenerManager & EventAdapter with PluginManager. Upd…
strohganoff Mar 18, 2025
6e14c05
Clean up configs Pydantic model code
strohganoff Mar 18, 2025
49c0a11
Update configs Pydantic models to load EventListeners and Event model…
strohganoff Mar 18, 2025
6d81b36
Integrate loading developer-defined EventListeners and Event models i…
strohganoff Mar 18, 2025
c71c604
Remove command_sender.send_action_registration arg redundancy
strohganoff Mar 19, 2025
363c6e5
Fix up EventListenerManager to reduce processing load of method by u…
strohganoff Mar 19, 2025
c17bd6c
Fix to ensure all event listeners are stopped when disconnected from …
strohganoff Mar 20, 2025
412b4cd
Update README.md to document creating custom EventListeners
strohganoff Mar 20, 2025
522b0ae
Fix some tests
strohganoff Mar 20, 2025
aa2096d
Upgrade pypa/gh-action-pypi-publish GitHub action version
strohganoff Mar 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
python -m pip install build
python -m build
- name: Publish release distributions to PyPI
uses: pypa/gh-action-pypi-publish@v1.11.0
uses: pypa/gh-action-pypi-publish@v1.12.4
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,69 @@ And a command like the following is called by the Stream Deck software:
streamdeck -port 28196 -pluginUUID 63831042F4048F072B096732E0385245 -registerEvent registerPlugin -info '{"application": {...}, "plugin": {"uuid": "my-plugin-name", "version": "1.1.3"}, ...}'
```

## Custom Event Listeners

The SDK allows you to create custom event listeners and events by extending the `EventListener` and `EventBase` classes. This is useful when you need to monitor data from external applications and perform specific actions in response to changes or alerts.

### Creating a Custom Event Listener

To create a custom event listener:

1. Create new event model that inherits from `EventBase`.
2. Create a new class that inherits from `EventListener`.
a. Implement the required `listen` and `stop` methods. The `listen` method should yield results as a json string that matches the new event model.
b. List the new event classes in the `event_models` class variable of the new `EventListener` class.
3. Configure your plugin in its `pyproject.toml` file to use your custom listener.

```python
# custom_listener.py
from collections.abc import Generator
from typing import ClassVar, Literal

from streamdeck.event_listener import EventListener
from streamdeck.models.events import EventBase


class MyCustomEvent(EventBase):
event: Literal["somethingHappened"]
... # Define additional data attributes here

class MyCustomEventListener(EventListener):
def listen(self) -> Generator[str | bytes, None, None]:
...
# Listen/poll for something here in a loop, and yield the result.
# This will be ran in a background thread.
# Ex:
# while self._running is True:
# result = module.check_status()
# if result is not None:
# yield json.dumps({"event": "somethingHappend", "result": result})
# time.sleep(1)

def stop(self) -> None:
...
# Stop the loop or blocking call in the listen method.
# Ex:
# self._running = False
```

### Configuring Your Custom Listener

To use your custom event listener, add it to your `pyproject.toml` file:

```toml
[tools.streamdeck]
action_scripts = [
"main.py",
]
event_listener_modules = [
"myplugin.custom_listener",
]
```

The `event_listeners` list should contain strings in module format for each module you want to use.


## Creating and Packaging Plugins

To create a new plugin with all of the necessary files to start from and package it for use on your Stream Deck, use [the Python SDK CLI tool](https://github.com/strohganoff/python-streamdeck-plugin-sdk-cli).
Expand Down
10 changes: 9 additions & 1 deletion streamdeck/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,18 @@ def main(
info=info_data,
)

# Event listeners and their Event models are registered before actions in order to validate the actions' registered events' names.
for event_listener in pyproject.event_listeners:
manager.register_event_listener(event_listener())

for action in actions:
manager.register_action(action)

manager.run()
try:
manager.run()
except Exception as e:
logger.exception("Error in plugin manager")
raise


# Also run the plugin if this script is ran as a console script.
Expand Down
27 changes: 17 additions & 10 deletions streamdeck/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from logging import getLogger
from typing import TYPE_CHECKING, cast

from streamdeck.types import BaseEventHandlerFunc, available_event_names
from streamdeck.types import BaseEventHandlerFunc


if TYPE_CHECKING:
Expand All @@ -22,7 +22,7 @@
class ActionBase(ABC):
"""Base class for all actions."""

def __init__(self):
def __init__(self) -> None:
"""Initialize an Action instance.

Args:
Expand All @@ -42,13 +42,13 @@ def on(self, event_name: EventNameStr, /) -> Callable[[EventHandlerFunc[TEvent_c
Raises:
KeyError: If the provided event name is not available.
"""
if event_name not in available_event_names:
msg = f"Provided event name for action handler does not exist: {event_name}"
raise KeyError(msg)
# if event_name not in DEFAULT_EVENT_NAMES:
# msg = f"Provided event name for action handler does not exist: {event_name}"
# raise KeyError(msg)

def _wrapper(func: EventHandlerFunc[TEvent_contra]) -> EventHandlerFunc[TEvent_contra]:
# Cast to BaseEventHandlerFunc so that the storage type is consistent.
self._events[event_name].add(cast(BaseEventHandlerFunc, func))
self._events[event_name].add(cast("BaseEventHandlerFunc", func))

return func

Expand All @@ -66,15 +66,22 @@ def get_event_handlers(self, event_name: EventNameStr, /) -> Generator[EventHand
Raises:
KeyError: If the provided event name is not available.
"""
if event_name not in available_event_names:
msg = f"Provided event name for pulling handlers from action does not exist: {event_name}"
raise KeyError(msg)
# if event_name not in DEFAULT_EVENT_NAMES:
# msg = f"Provided event name for pulling handlers from action does not exist: {event_name}"
# raise KeyError(msg)

if event_name not in self._events:
return

yield from self._events[event_name]

def get_registered_event_names(self) -> list[str]:
"""Get all event names for which event handlers are registered.

Returns:
list[str]: The list of event names for which event handlers are registered.
"""
return list(self._events.keys())

class GlobalAction(ActionBase):
"""Represents an action that is performed at the plugin level, meaning it isn't associated with a specific device or action."""
Expand All @@ -83,7 +90,7 @@ class GlobalAction(ActionBase):
class Action(ActionBase):
"""Represents an action that can be performed for a specific action, with event handlers for specific event types."""

def __init__(self, uuid: str):
def __init__(self, uuid: str) -> None:
"""Initialize an Action instance.

Args:
Expand Down
5 changes: 1 addition & 4 deletions streamdeck/command_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ def send_to_plugin(
def send_action_registration(
self,
register_event: str,
plugin_registration_uuid: str,
) -> None:
"""Registers a plugin with the Stream Deck software very shortly after the plugin is started.

Expand All @@ -270,10 +269,8 @@ def send_action_registration(
Args:
register_event (str): The registration event type, passed in by the Stream Deck software as -registerEvent option.
It's value will almost definitely will be "registerPlugin".
plugin_registration_uuid (str): Randomly-generated unique ID passed in by StreamDeck as -pluginUUID option,
used to send back in the registerPlugin event. Note that this is NOT the manifest.json -configured plugin UUID value.
"""
self._send_event(
event=register_event,
uuid=plugin_registration_uuid,
uuid=self._plugin_registration_uuid,
)
137 changes: 137 additions & 0 deletions streamdeck/event_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from __future__ import annotations

import threading
from abc import ABC, abstractmethod
from logging import getLogger
from queue import Queue
from typing import TYPE_CHECKING


if TYPE_CHECKING:
from collections.abc import Generator
from typing import Any, ClassVar

from typing_extensions import TypeIs

from streamdeck.models.events import EventBase



logger = getLogger("streamdeck.event_listener")


class _SENTINAL:
"""A sentinel object used to signal the end of the event stream.

Not meant to be instantiated, but rather used as a singleton (e.g. `_SENTINAL`).
"""
@classmethod
def is_sentinal(cls, event: str | bytes | type[_SENTINAL]) -> TypeIs[type[_SENTINAL]]:
"""Check if an event is the sentinal object. Provided to enable better type-checking."""
return event is cls


class StopStreaming(Exception): # noqa: N818
"""Raised by an EventListener implementation to signal that the entire EventManagerListener should stop streaming events."""


class EventListenerManager:
"""Manages event listeners and provides a shared event queue for them to push events into.

With this class, a single event stream can be created from multiple listeners.
This allows for us to listen for not only Stream Deck events, but also other events plugin-developer -defined events.
"""
def __init__(self) -> None:
self.event_queue: Queue[str | bytes | type[_SENTINAL]] = Queue()
self.listeners_lookup_by_thread: dict[threading.Thread, EventListener] = {}
self._running = False

def add_listener(self, listener: EventListener) -> None:
"""Registers a listener function that yields events.

Args:
listener: A function that yields events.
"""
# Create a thread for the listener
thread = threading.Thread(
target=self._listener_wrapper,
args=(listener,),
daemon=True,
)
self.listeners_lookup_by_thread[thread] = listener

def _listener_wrapper(self, listener: EventListener) -> None:
"""Wraps the listener function: for each event yielded, push it into the shared queue."""
try:
for event in listener.listen():
self.event_queue.put(event)

if not self.running:
break

except StopStreaming:
logger.debug("Event listener requested to stop streaming.")
self.event_queue.put(_SENTINAL)

except Exception:
logger.exception("Unexpected error in wrapped listener %s. Stopping just this listener.", listener)

def stop(self) -> None:
"""Stops the event generation loop and waits for all threads to finish.

Listeners will check the running flag if implemented to stop listening.
"""
# Set the running flag to False to stop the listeners running in separate threads.
self.running = False
# Push the sentinel to immediately unblock the queue.get() in event_stream.
self.event_queue.put(_SENTINAL)

for thread in self.listeners_lookup_by_thread:
logger.debug("Stopping listener %s.")
self.listeners_lookup_by_thread[thread].stop()
if thread.is_alive():
thread.join()

logger.info("All listeners have been stopped.")

def event_stream(self) -> Generator[str | bytes, None, None]:
"""Starts all registered listeners, sets the running flag to True, and yields events from the shared queue."""
logger.info("Starting event stream.")
# Set the running flag to True and start the listeners in their separate threads.
self.running = True
for thread in self.listeners_lookup_by_thread:
thread.start()

try:
while True:
event = self.event_queue.get()
if _SENTINAL.is_sentinal(event):
logger.debug("Sentinal received, stopping event stream.")
break # Exit loop immediately if the sentinal is received
yield event
finally:
self.stop()


class EventListener(ABC):
"""Base class for event listeners.

Event listeners are classes that listen for events and simply yield them as they come.
The EventListenerManager will handle the threading and pushing the events yielded into a shared queue.
"""
event_models: ClassVar[list[type[EventBase]]]
"""A list of event models that the listener can yield. Read in by the PluginManager to model the incoming event data off of.

The plugin-developer must define this list in their subclass.
"""

@abstractmethod
def listen(self) -> Generator[str | bytes, Any, None]:
"""Start listening for events and yield them as they come.

This is the method that run in a separate thread.
"""

@abstractmethod
def stop(self) -> None:
"""Stop the listener. This could set an internal flag, close a connection, etc."""
Loading
Loading