Skip to content

Commit 7a3ed62

Browse files
author
Pablo Panero
committed
poc: message bus implementation
1 parent 76274aa commit 7a3ed62

File tree

7 files changed

+164
-0
lines changed

7 files changed

+164
-0
lines changed

invenio_records_resources/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@
1616
SITE_UI_URL = "https://127.0.0.1:5000"
1717

1818
SITE_API_URL = "https://127.0.0.1:5000/api"
19+
20+
RECORDS_RESOURCES_EVENT_HANDLERS = {}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Module for event driven actions support."""
10+
11+
from .bus import EventBus
12+
from .events import Event
13+
from .handlers import EventHandler
14+
15+
__all__ = (
16+
"Event",
17+
"EventHandler",
18+
"EventBus"
19+
)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Events bus module."""
10+
11+
from flask import current_app
12+
13+
14+
class EventBus:
15+
"""Event bus."""
16+
17+
def __init__(self, handlers, queue):
18+
"""Constructor."""
19+
self._handlers = handlers
20+
self._queue = queue
21+
22+
def publish(self, event):
23+
"""Publish an event to the bus."""
24+
return self._queue.publish(event)
25+
26+
def handle_events(self, uow):
27+
"""Handle a list of events."""
28+
for event in self._queue.consume():
29+
try:
30+
handlers = self._handlers[event]
31+
for handler in handlers:
32+
handler.handle(event, uow)
33+
except KeyError:
34+
current_app.logger.error(f"No handler for event {event}")
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Events module."""
10+
11+
from dataclasses import dataclass
12+
from datetime import datetime
13+
14+
15+
@dataclass
16+
class Event:
17+
"""Base event."""
18+
19+
created: datetime
20+
21+
22+
@dataclass
23+
class RecordEvent(Event):
24+
"""Record related events."""
25+
26+
recid: str
27+
# FIXME: should be an enum (created, deleted, published)
28+
# or splitted in many events
29+
action: str
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Event handlers module."""
10+
11+
from abc import ABC, abstractmethod
12+
13+
14+
class EventHandler(ABC):
15+
"""Abstract event handler class."""
16+
17+
@abstractmethod
18+
def handle(self, event, uow):
19+
"""Handle an event."""
20+
pass
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Event queue module."""
10+
11+
from abc import ABC, abstractmethod
12+
from queue import Empty, SimpleQueue
13+
14+
15+
class Queue(ABC):
16+
"""Base queue."""
17+
18+
@abstractmethod
19+
def publish(self, event):
20+
"""Publish and event to the queue."""
21+
22+
@abstractmethod
23+
def consume(self):
24+
"""Consume an event from the queue."""
25+
26+
27+
class MemoryQueue(Queue):
28+
"""In memory queue."""
29+
30+
def __init__(self):
31+
"""Constructor."""
32+
self._events = SimpleQueue()
33+
34+
def publish(self, event):
35+
"""Publish and event to the queue."""
36+
return self._events.put(event)
37+
38+
def consume(self):
39+
"""Consume an event from the queue."""
40+
try:
41+
yield self._events.get(block=False)
42+
except Empty:
43+
yield None

invenio_records_resources/services/uow.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,12 @@ def on_commit(self, uow):
104104

105105
from functools import wraps
106106

107+
from flask import current_app
107108
from invenio_db import db
108109

110+
from .events import EventBus
111+
from .events.queue import MemoryQueue
112+
109113

110114
#
111115
# Unit of work operations
@@ -215,6 +219,10 @@ def __init__(self, session=None):
215219
"""Initialize unit of work context."""
216220
self._session = session or db.session
217221
self._operations = []
222+
self._event_bus = EventBus(
223+
handlers=current_app.config["RECORDS_RESOURCES_EVENT_HANDLERS"],
224+
queue=MemoryQueue()
225+
)
218226
self._dirty = False
219227

220228
def __enter__(self):
@@ -267,6 +275,14 @@ def register(self, op):
267275
# Append to list of operations.
268276
self._operations.append(op)
269277

278+
def add_event(self, event):
279+
"""Adds an event."""
280+
self._event_bus.publish(event)
281+
282+
def handle_events(self):
283+
"""Triggers the handling of all stored events."""
284+
self._event_bus.handle_events(uow=self)
285+
270286

271287
def unit_of_work(**kwargs):
272288
"""Decorator to auto-inject a unit of work if not provided.
@@ -291,6 +307,7 @@ def inner(self, *args, **kwargs):
291307
kwargs['uow'] = uow
292308
res = f(self, *args, **kwargs)
293309
uow.commit()
310+
uow.handle_events()
294311
return res
295312
else:
296313
return f(self, *args, **kwargs)

0 commit comments

Comments
 (0)