Skip to content

Commit 0dba51b

Browse files
author
Pablo Panero
committed
event bus: initial implementation
1 parent 76274aa commit 0dba51b

File tree

10 files changed

+201
-2
lines changed

10 files changed

+201
-2
lines changed

invenio_records_resources/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- coding: utf-8 -*-
22
#
3-
# Copyright (C) 2020 CERN.
3+
# Copyright (C) 2020-2022 CERN.
44
# Copyright (C) 2020 Northwestern University.
55
#
66
# Invenio-Records-Resources is free software; you can redistribute it and/or
@@ -16,3 +16,7 @@
1616
SITE_UI_URL = "https://127.0.0.1:5000"
1717

1818
SITE_API_URL = "https://127.0.0.1:5000/api"
19+
20+
RECORDS_RESOURCES_EVENTS_HANDLERS = {}
21+
22+
RECORDS_RESOURCES_EVENTS_QUEUE = "events"
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Module for event driven actions support."""
10+
11+
from .bus import EventBus
12+
from .events import Event
13+
from .handlers import EventHandler
14+
15+
__all__ = (
16+
"Event",
17+
"EventHandler",
18+
"EventBus"
19+
)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Events bus module."""
10+
11+
from pickle import dumps, loads
12+
13+
from flask import current_app
14+
from invenio_queues.proxies import current_queues
15+
16+
17+
class EventBus:
18+
"""Event bus."""
19+
20+
def __init__(self, queue_name=None):
21+
"""Constructor."""
22+
self._queue_name = queue_name or \
23+
current_app.config["RECORDS_RESOURCES_EVENTS_QUEUE"]
24+
self._queue = None
25+
26+
for name, queue in current_queues.queues.items():
27+
if name == self._queue_name:
28+
self._queue = queue
29+
break
30+
31+
def publish(self, event):
32+
"""Publish an event to the bus queue."""
33+
return self._queue.publish([dumps(event)])
34+
35+
def consume(self):
36+
"""Consume an event from the bus queue."""
37+
for event in self._queue.consume(): # consume() returns a generator
38+
yield loads(event)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Events module."""
10+
11+
from dataclasses import dataclass
12+
from datetime import datetime
13+
from typing import ClassVar
14+
15+
16+
@dataclass
17+
class Event:
18+
"""Base event."""
19+
20+
created: datetime
21+
type: str
22+
action: str
23+
24+
25+
@dataclass
26+
class RecordEvent(Event):
27+
"""Record related events."""
28+
29+
recid: str
30+
type: ClassVar[str] = "RECORD"
31+
32+
33+
@dataclass
34+
class RecordCreatedEvent(RecordEvent):
35+
"""Record related events."""
36+
37+
action: ClassVar[str] = "PUBLISHED"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Event handlers module."""
10+
11+
from abc import ABC, abstractmethod
12+
13+
14+
class EventHandler(ABC):
15+
"""Abstract event handler class."""
16+
17+
@abstractmethod
18+
def handle(self, event):
19+
"""Handle an event."""
20+
pass

invenio_records_resources/services/uow.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ def on_commit(self, uow):
106106

107107
from invenio_db import db
108108

109+
from .events import EventBus
110+
109111

110112
#
111113
# Unit of work operations
@@ -199,6 +201,21 @@ def on_post_commit(self, uow):
199201
self._celery_task.delay(*self._args, **self._kwargs)
200202

201203

204+
class EventOp(Operation):
205+
"""A task to send an event.
206+
207+
All events will be sent after the commit phase.
208+
"""
209+
210+
def __init__(self, event, *args, **kwargs):
211+
"""Constructor."""
212+
self._event = event
213+
214+
def on_post_commit(self, uow):
215+
"""Publish the event to the bus."""
216+
uow._event_bus.publish(self._event)
217+
218+
202219
#
203220
# Unit of work context manager
204221
#
@@ -215,6 +232,7 @@ def __init__(self, session=None):
215232
"""Initialize unit of work context."""
216233
self._session = session or db.session
217234
self._operations = []
235+
self._event_bus = EventBus()
218236
self._dirty = False
219237

220238
def __enter__(self):

run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ trap cleanup EXIT
3131

3232
python -m check_manifest --ignore ".*-requirements.txt"
3333
python -m sphinx.cmd.build -qnNW docs docs/_build/html
34-
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --env)"
34+
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --mq ${MQ:-rabbitmq} --env)"
3535
python -m pytest $@
3636
tests_exit_code=$?
3737
python -m sphinx.cmd.build -qnNW -b doctest docs docs/_build/doctest

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"invenio-indexer>=1.2.1",
6868
"invenio-jsonschemas>=1.1.3",
6969
"invenio-pidstore>=1.2.2",
70+
"invenio-queues>=1.0.0a4",
7071
"invenio-records-permissions>=0.13.0,<0.14.0",
7172
"invenio-records>=1.6.0",
7273
"luqum>=0.11.0",

tests/services/events/conftest.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Pytest configuration.
10+
11+
See https://pytest-invenio.readthedocs.io/ for documentation on which test
12+
fixtures are available.
13+
"""
14+
15+
import pytest
16+
from kombu import Exchange
17+
18+
19+
@pytest.fixture(scope="module")
20+
def app_config(app_config):
21+
"""Application configuration."""
22+
queue_name = "test-events"
23+
app_config["RECORDS_RESOURCES_EVENT_QUEUE"] = queue_name
24+
25+
exchange = Exchange(
26+
queue=queue_name,
27+
type="direct",
28+
delivery_mode="persistent", # in-memory and disk
29+
)
30+
31+
app_config["QUEUES_DEFINITIONS"] = [
32+
{"name": queue_name, "exchange": exchange}
33+
]
34+
35+
app_config["CELERY_ACCEPT_CONTENT"] = ["json", "msgpack", "yaml", "pickle"]
36+
37+
return app_config

tests/services/events/test_bus.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Event bus test."""
10+
11+
from datetime import datetime
12+
from time import sleep
13+
14+
from invenio_records_resources.services.events import EventBus
15+
from invenio_records_resources.services.events.events import RecordCreatedEvent
16+
17+
18+
def test_bus_publish_consume(app):
19+
bus = EventBus("test-events")
20+
event = RecordCreatedEvent(created=datetime.now(), recid="12345-abcde")
21+
22+
bus.publish(event)
23+
sleep(10)
24+
consumed_event = bus.consume()
25+
assert event == next(consumed_event)

0 commit comments

Comments
 (0)