Skip to content

Use single topic per workspace #639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
afe5ffa
TST: Test `get_sruid_from_pub_sub_resource_name`
cortadocodes Mar 21, 2024
4526148
ENH: Add recipient and originator event attributes
cortadocodes Mar 22, 2024
47c8af2
FIX: Require all question event attributes
cortadocodes Mar 27, 2024
55eee94
ENH: Give every event a UUID
cortadocodes Mar 27, 2024
f2cb0e2
REF: Rename `message_number` to `ordering_key`
cortadocodes Mar 27, 2024
ed64727
FIX: Only update earliest waiting message number if waiting messages
cortadocodes Mar 27, 2024
4daa88e
ENH: Add mandatory originator, sender, and recipient event attributes
cortadocodes Mar 27, 2024
78117e2
FIX: Rename `ordering_key` to `order`
cortadocodes Mar 27, 2024
17e6dfa
REF: Rename `version` attribute to `sender_sdk_version`
cortadocodes Mar 28, 2024
f61f38a
FIX: Avoid getting earliest waiting message if there isn't one
cortadocodes Mar 28, 2024
562d2b4
FIX: Avoid silently failing if `question_uuid` attribute missing
cortadocodes Mar 28, 2024
95da7cd
REF: Move `OCTUE_SERVICES_NAMESPACE` to package root
cortadocodes Mar 28, 2024
76127e9
FEA: Allow specification of services namespace via `octue.yaml`
cortadocodes Mar 28, 2024
9e7a772
REF: Remove adding of services namespace to topics and subscriptions
cortadocodes Mar 28, 2024
da93219
REF: Rename `OCTUE_SERVICES_NAMESPACE`
cortadocodes Mar 28, 2024
059cab3
ENH: Add services namespace to `GCPPubSubBackend`
cortadocodes Mar 28, 2024
263377e
FEA: Publish/subscribe to single topic in `Service`
cortadocodes Mar 28, 2024
6ec4f5e
REF: Switch back to unsplit SRUIDs in event attributes
cortadocodes Mar 28, 2024
22befcc
FIX: Add correct recipient to event attributes
cortadocodes Mar 28, 2024
974f52c
FIX: Avoid circular imports
cortadocodes Mar 28, 2024
7c7bf51
ENH: Update Pub/Sub emulators to use a single topic
cortadocodes Mar 28, 2024
807ae75
TST: Update pub/sub event tests
cortadocodes Apr 2, 2024
f22b635
TST: Update pub/sub logging tests
cortadocodes Apr 2, 2024
166d1a2
TST: Update service tests
cortadocodes Apr 2, 2024
df62601
FIX: Fix event order
cortadocodes Apr 2, 2024
e3a1b3f
TST: Uncomment service tests
cortadocodes Apr 2, 2024
414a75f
OPS: Add `octue.services` topic
cortadocodes Apr 2, 2024
a572b54
TST: Update more tests
cortadocodes Apr 2, 2024
da45d91
FIX: Shorten answer subscription filter
cortadocodes Apr 2, 2024
1140979
TST: Update child emulator tests
cortadocodes Apr 8, 2024
6ea4668
ENH: Update CLI command to use single topic per workspace
cortadocodes Apr 8, 2024
e61013a
DOC: Update docstrings
cortadocodes Apr 8, 2024
7472cd9
ENH: Use latest services communication schema
cortadocodes Apr 8, 2024
57cff9e
FIX: Update `get_sruid_from_pub_sub_resource_name`
cortadocodes Apr 8, 2024
feadb81
TST: Remove unnecessary `octue.services`
cortadocodes Apr 8, 2024
a25d919
REF: Move event counter out of `Topic` and into `Service`
cortadocodes Apr 8, 2024
09fc809
REF: Move unrelated code out of thread lock
cortadocodes Apr 8, 2024
e401a71
REF: Factor out services topic in `Service`
cortadocodes Apr 8, 2024
1e5d634
OPS: Update bigquery table and event handler cloud function
cortadocodes Apr 8, 2024
5b89aaf
FIX: Restore answer subscription names to previous format
cortadocodes Apr 9, 2024
ddf7100
DOC: Update `Service` docstrings
cortadocodes Apr 9, 2024
82db91e
TST: Simplify `GoogleCloudPubSubHandler` tests
cortadocodes Apr 9, 2024
c5075b4
ENH: Make `Topic`'s representation consistent with `Subscription`'s
cortadocodes Apr 9, 2024
2b96b82
TST: Update pub/sub event handler tests
cortadocodes Apr 9, 2024
f221b8e
TST: Simplify pub/sub event handler tests
cortadocodes Apr 9, 2024
c77f7f4
TST: Factor out patching to start of services test class
cortadocodes Apr 9, 2024
38d687f
TST: Factor out service patching in pub/sub event handler tests
cortadocodes Apr 9, 2024
19632f3
FIX: Allow services to be instantiated without GCP credentials
cortadocodes Apr 9, 2024
0fc802b
TST: Factor out service patching in child tests
cortadocodes Apr 9, 2024
aa37402
ENH: Add string representation to `MockMessageWrapper`
cortadocodes Apr 9, 2024
426e765
FIX: Fix race condition in event emission order
cortadocodes Apr 9, 2024
67a2490
REF: Rename `receiving_service` to `recipient`
cortadocodes Apr 9, 2024
242d097
REF: Rename `Service._send_message` to `Service._emit_event`
cortadocodes Apr 9, 2024
4a5ba60
REF: Make services prefix a constant
cortadocodes Apr 9, 2024
0fcdc97
REF: Rename `Service.emit_event` to `Service._emit_event`
cortadocodes Apr 9, 2024
7ccb020
TST: Test error is raised if service topic is missing
cortadocodes Apr 9, 2024
a54c04b
TST: Update cloud deployment test
cortadocodes Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions octue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

__all__ = ("Runner",)


REPOSITORY_ROOT = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))


Expand Down
26 changes: 3 additions & 23 deletions octue/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,35 +357,16 @@ def deploy():
help="The service revision tag (e.g. 1.0.7). If this option isn't given, a random 'cool name' tag is generated e.g"
". 'curious-capybara'.",
)
@click.option(
"--filter",
is_flag=False,
default='attributes.sender_type = "PARENT"',
show_default=True,
help="An optional filter to apply to the subscription (see "
"https://cloud.google.com/pubsub/docs/subscription-message-filter). If not provided, the default filter is applied."
" To disable filtering, provide an empty string.",
)
@click.option(
"--subscription-suffix",
is_flag=False,
default=None,
show_default=True,
help="An optional suffix to add to the end of the subscription name. This is useful when needing to create "
"multiple subscriptions for the same topic (subscription names are unique).",
)
def create_push_subscription(
project_name,
service_namespace,
service_name,
push_endpoint,
expiration_time,
revision_tag,
filter,
subscription_suffix,
):
"""Create a Google Pub/Sub push subscription for an Octue service for it to receive questions from parents. If a
corresponding topic doesn't exist, it will be created first. The subscription name is printed on completion.
"""Create a Google Pub/Sub push subscription for an Octue service for it to receive questions from parents. The
subscription name is printed on completion.

PROJECT_NAME is the name of the Google Cloud project in which the subscription will be created

Expand All @@ -403,8 +384,7 @@ def create_push_subscription(
sruid,
push_endpoint,
expiration_time=expiration_time,
subscription_filter=filter or None,
subscription_suffix=subscription_suffix,
subscription_filter=f'attributes.recipient = "{sruid}" AND attributes.sender_type = "PARENT"',
)

click.echo(sruid)
Expand Down
9 changes: 2 additions & 7 deletions octue/cloud/deployment/google/answer_pub_sub_question.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging

from octue.cloud.pub_sub import Topic
from octue.cloud.pub_sub.service import Service
from octue.cloud.service_id import convert_service_id_to_pub_sub_form, create_sruid, get_sruid_parts
from octue.cloud.service_id import create_sruid, get_sruid_parts
from octue.configuration import load_service_and_app_configuration
from octue.resources.service_backends import GCPPubSubBackend
from octue.runner import Runner
Expand Down Expand Up @@ -47,9 +46,5 @@ def answer_question(question, project_name):
logger.info("Analysis successfully run and response sent for question %r.", question_uuid)

except BaseException as error: # noqa
service.send_exception(
topic=Topic(name=convert_service_id_to_pub_sub_form(service_sruid), project_name=project_name),
question_uuid=question_uuid,
)

service.send_exception(question_uuid=question_uuid, originator="UNKNOWN")
logger.exception(error)
42 changes: 25 additions & 17 deletions octue/cloud/emulators/_pub_sub.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import importlib.metadata
import json
import logging
from collections import defaultdict

import google.api_core

from octue.cloud.pub_sub import Subscription, Topic
from octue.cloud.pub_sub.service import ANSWERS_NAMESPACE, PARENT_SENDER_TYPE, Service
from octue.cloud.service_id import convert_service_id_to_pub_sub_form
from octue.cloud.pub_sub.service import PARENT_SENDER_TYPE, Service
from octue.resources import Manifest
from octue.utils.dictionaries import make_minimal_dictionary
from octue.utils.encoders import OctueJSONEncoder


logger = logging.getLogger(__name__)

TOPICS = {}
SUBSCRIPTIONS = {}
TOPICS = set()
SUBSCRIPTIONS = set()
MESSAGES = defaultdict(list)


class MockTopic(Topic):
Expand All @@ -33,7 +34,7 @@ def create(self, allow_existing=False):
raise google.api_core.exceptions.AlreadyExists(f"Topic {self.path!r} already exists.")

if not self.exists():
TOPICS[self.name] = []
TOPICS.add(self.name)
self._created = True

def delete(self):
Expand All @@ -42,7 +43,7 @@ def delete(self):
:return None:
"""
try:
del TOPICS[self.name]
TOPICS.remove(self.name)
except KeyError:
pass

Expand Down Expand Up @@ -73,7 +74,7 @@ def create(self, allow_existing=False):
raise google.api_core.exceptions.AlreadyExists(f"Subscription {self.path!r} already exists.")

if not self.exists():
SUBSCRIPTIONS[self.name] = []
SUBSCRIPTIONS.add(self.name)
self._created = True

def delete(self):
Expand Down Expand Up @@ -146,8 +147,7 @@ def publish(self, topic, data, retry=None, **attributes):
:param google.api_core.retry.Retry|None retry:
:return MockFuture:
"""
subscription_name = ".".join((get_pub_sub_resource_name(topic), ANSWERS_NAMESPACE, attributes["question_uuid"]))
SUBSCRIPTIONS[subscription_name].append(MockMessage(data=data, attributes=attributes))
MESSAGES[attributes["question_uuid"]].append(MockMessage(data=data, attributes=attributes))
return MockFuture()


Expand Down Expand Up @@ -191,12 +191,12 @@ def pull(self, request, timeout=None, retry=None):
if self.closed:
raise ValueError("ValueError: Cannot invoke RPC: Channel closed!")

subscription_name = get_pub_sub_resource_name(request["subscription"])
question_uuid = request["subscription"].split(".")[-1]

try:
return MockPullResponse(
received_messages=[
MockMessageWrapper(message=SUBSCRIPTIONS[subscription_name].pop(0)),
MockMessageWrapper(message=MESSAGES[question_uuid].pop(0)),
]
)

Expand Down Expand Up @@ -242,6 +242,13 @@ def __init__(self, message):
self.message = message
self.ack_id = None

def __repr__(self):
"""Represent the mock message as a string.

:return str:
"""
return f"<{type(self).__name__}(message={self.message})>"


class MockMessage:
"""A mock Pub/Sub message containing serialised data and any number of attributes.
Expand Down Expand Up @@ -356,8 +363,7 @@ def ask(

# Delete question from messages sent to subscription so the parent doesn't pick it up as a response message. We
# do this as subscription filtering isn't implemented in this set of mocks.
subscription_name = ".".join((convert_service_id_to_pub_sub_form(service_id), ANSWERS_NAMESPACE, question_uuid))
SUBSCRIPTIONS["octue.services." + subscription_name].pop(0)
MESSAGES[question_uuid].pop(0)

question = make_minimal_dictionary(kind="question", input_values=input_values, children=children)

Expand All @@ -371,13 +377,15 @@ def ask(
MockMessage.from_primitive(
data=question,
attributes={
"sender_type": PARENT_SENDER_TYPE,
"question_uuid": question_uuid,
"forward_logs": subscribe_to_logs,
"version": parent_sdk_version,
"save_diagnostics": save_diagnostics,
"message_number": 0,
"sender": service_id,
"order": 0,
"originator": self.id,
"sender": self.id,
"sender_type": PARENT_SENDER_TYPE,
"sender_sdk_version": parent_sdk_version,
"recipient": service_id,
},
)
)
Expand Down
1 change: 1 addition & 0 deletions octue/cloud/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OCTUE_SERVICES_PREFIX = "octue.services"
35 changes: 35 additions & 0 deletions octue/cloud/events/counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
class EventCounter:
"""A mutable counter for keeping track of the emission order of events. This is used in the `Service` class instead
of an integer because it is mutable and can be passed to the `Service._emit_event` method and incremented as
events are emitted.

:return None:
"""

def __init__(self):
self.count = 0

def __iadd__(self, other):
"""Increment the counter by an integer.

:return octue.cloud.events.counter.EventCounter: the event counter with its count updated
"""
if not isinstance(other, int):
raise ValueError(f"Event counters can only be incremented by an integer; received {other!r}.")

self.count += other
return self

def __int__(self):
"""Get the counter as an integer.

:return int: the counter as an integer
"""
return int(self.count)

def __repr__(self):
"""Represent the counter as a string.

:return str: the counter represented as a string.
"""
return f"<{type(self).__name__}(count={self.count})"
40 changes: 19 additions & 21 deletions octue/cloud/events/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class AbstractEventHandler:
"""An abstract event handler. Inherit from this and add the `handle_events` and `_extract_event_and_attributes`
methods to handle events from a specific source synchronously or asynchronously.

:param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the events
:param octue.cloud.pub_sub.service.Service recipient: the service that's receiving the events
:param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive
:param bool record_events: if `True`, record received events in the `received_events` attribute
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers should not mutate the events.
Expand All @@ -43,15 +43,15 @@ class AbstractEventHandler:

def __init__(
self,
receiving_service,
recipient,
handle_monitor_message=None,
record_events=True,
event_handlers=None,
schema=SERVICE_COMMUNICATION_SCHEMA,
skip_missing_events_after=10,
only_handle_result=False,
):
self.receiving_service = receiving_service
self.recipient = recipient
self.handle_monitor_message = handle_monitor_message
self.record_events = record_events
self.schema = schema
Expand Down Expand Up @@ -122,33 +122,31 @@ def _extract_and_enqueue_event(self, container):
if not is_event_valid(
event=event,
attributes=attributes,
receiving_service=self.receiving_service,
recipient=self.recipient,
parent_sdk_version=PARENT_SDK_VERSION,
child_sdk_version=attributes.get("version"),
child_sdk_version=attributes["sender_sdk_version"],
schema=self.schema,
):
return

# Get the child's SRUID and Octue SDK version from the first event.
if not self._child_sdk_version:
self.question_uuid = attributes.get("question_uuid")
self._child_sdk_version = attributes["version"]
self.child_sruid = attributes["sender"]
self.question_uuid = attributes["question_uuid"]
self._child_sdk_version = attributes["sender_sdk_version"]

# Backwards-compatible with previous event schema versions.
self.child_sruid = attributes.get("sender", "REMOTE")
logger.debug("%r received an event related to question %r.", self.recipient, self.question_uuid)
order = attributes["order"]

logger.debug("%r received an event related to question %r.", self.receiving_service, self.question_uuid)
event_number = attributes["message_number"]

if event_number in self.waiting_events:
if order in self.waiting_events:
logger.warning(
"%r: Event with duplicate event number %d received for question %s - overwriting original event.",
self.receiving_service,
event_number,
"%r: Event with duplicate order %d received for question %s - overwriting original event.",
self.recipient,
order,
self.question_uuid,
)

self.waiting_events[event_number] = event
self.waiting_events[order] = event

def _attempt_to_handle_waiting_events(self):
"""Attempt to handle events waiting in `self.waiting_events`. If these events aren't consecutive to the
Expand Down Expand Up @@ -204,7 +202,7 @@ def _skip_to_earliest_waiting_event(self):
logger.warning(
"%r: %d consecutive events missing for question %r after %ds - skipping to next earliest waiting event "
"(event %d).",
self.receiving_service,
self.recipient,
number_of_missing_events,
self.question_uuid,
self.skip_missing_events_after,
Expand Down Expand Up @@ -236,7 +234,7 @@ def _handle_delivery_acknowledgement(self, event):
:param dict event:
:return None:
"""
logger.info("%r's question was delivered at %s.", self.receiving_service, event["datetime"])
logger.info("%r's question was delivered at %s.", self.recipient, event["datetime"])

def _handle_heartbeat(self, event):
"""Record the time the heartbeat was received.
Expand All @@ -253,7 +251,7 @@ def _handle_monitor_message(self, event):
:param dict event:
:return None:
"""
logger.debug("%r received a monitor message.", self.receiving_service)
logger.debug("%r received a monitor message.", self.recipient)

if self.handle_monitor_message is not None:
self.handle_monitor_message(event["data"])
Expand Down Expand Up @@ -317,7 +315,7 @@ def _handle_result(self, event):
:param dict event:
:return dict:
"""
logger.info("%r received an answer to question %r.", self.receiving_service, self.question_uuid)
logger.info("%r received an answer to question %r.", self.recipient, self.question_uuid)

if event.get("output_manifest"):
output_manifest = Manifest.deserialise(event["output_manifest"])
Expand Down
14 changes: 8 additions & 6 deletions octue/cloud/events/replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
class EventReplayer(AbstractEventHandler):
"""A replayer for events retrieved asynchronously from some kind of storage.

:param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the events
:param octue.cloud.pub_sub.service.Service recipient: the service that's receiving the events
:param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive
:param bool record_events: if `True`, record received events in the `received_events` attribute
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers should not mutate the events.
Expand All @@ -23,15 +23,15 @@ class EventReplayer(AbstractEventHandler):

def __init__(
self,
receiving_service=None,
recipient=None,
handle_monitor_message=None,
record_events=True,
event_handlers=None,
schema=SERVICE_COMMUNICATION_SCHEMA,
only_handle_result=False,
):
super().__init__(
receiving_service or Service(backend=ServiceBackend(), service_id="local/local:local"),
recipient or Service(backend=ServiceBackend(), service_id="local/local:local"),
handle_monitor_message=handle_monitor_message,
record_events=record_events,
event_handlers=event_handlers,
Expand All @@ -51,14 +51,16 @@ def handle_events(self, events):
for event in events:
self._extract_and_enqueue_event(event)

self._earliest_waiting_event_number = min(self.waiting_events.keys())
return self._attempt_to_handle_waiting_events()
# Handle the case where no events (or no valid events) have been received.
if self.waiting_events:
self._earliest_waiting_event_number = min(self.waiting_events.keys())
return self._attempt_to_handle_waiting_events()

def _extract_event_and_attributes(self, container):
"""Extract an event and its attributes from the event container.

:param dict container: the container of the event
:return (any, dict): the event and its attributes
"""
container["attributes"]["message_number"] = int(container["attributes"]["message_number"])
container["attributes"]["order"] = int(container["attributes"]["order"])
return container["event"], container["attributes"]
Loading
Loading