diff --git a/octue/__init__.py b/octue/__init__.py index 138e91f2c..eea4e34fb 100644 --- a/octue/__init__.py +++ b/octue/__init__.py @@ -6,6 +6,7 @@ __all__ = ("Runner",) + REPOSITORY_ROOT = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) diff --git a/octue/cli.py b/octue/cli.py index 9d649b597..08889808f 100644 --- a/octue/cli.py +++ b/octue/cli.py @@ -357,23 +357,6 @@ def deploy(): help="The service revision tag (e.g. 1.0.7). If this option isn't given, a random 'cool name' tag is generated e.g" ". 'curious-capybara'.", ) -@click.option( - "--filter", - is_flag=False, - default='attributes.sender_type = "PARENT"', - show_default=True, - help="An optional filter to apply to the subscription (see " - "https://cloud.google.com/pubsub/docs/subscription-message-filter). If not provided, the default filter is applied." - " To disable filtering, provide an empty string.", -) -@click.option( - "--subscription-suffix", - is_flag=False, - default=None, - show_default=True, - help="An optional suffix to add to the end of the subscription name. This is useful when needing to create " - "multiple subscriptions for the same topic (subscription names are unique).", -) def create_push_subscription( project_name, service_namespace, @@ -381,11 +364,9 @@ def create_push_subscription( push_endpoint, expiration_time, revision_tag, - filter, - subscription_suffix, ): - """Create a Google Pub/Sub push subscription for an Octue service for it to receive questions from parents. If a - corresponding topic doesn't exist, it will be created first. The subscription name is printed on completion. + """Create a Google Pub/Sub push subscription for an Octue service for it to receive questions from parents. The + subscription name is printed on completion. PROJECT_NAME is the name of the Google Cloud project in which the subscription will be created @@ -403,8 +384,7 @@ def create_push_subscription( sruid, push_endpoint, expiration_time=expiration_time, - subscription_filter=filter or None, - subscription_suffix=subscription_suffix, + subscription_filter=f'attributes.recipient = "{sruid}" AND attributes.sender_type = "PARENT"', ) click.echo(sruid) diff --git a/octue/cloud/deployment/google/answer_pub_sub_question.py b/octue/cloud/deployment/google/answer_pub_sub_question.py index 167832582..703a67f5c 100644 --- a/octue/cloud/deployment/google/answer_pub_sub_question.py +++ b/octue/cloud/deployment/google/answer_pub_sub_question.py @@ -1,8 +1,7 @@ import logging -from octue.cloud.pub_sub import Topic from octue.cloud.pub_sub.service import Service -from octue.cloud.service_id import convert_service_id_to_pub_sub_form, create_sruid, get_sruid_parts +from octue.cloud.service_id import create_sruid, get_sruid_parts from octue.configuration import load_service_and_app_configuration from octue.resources.service_backends import GCPPubSubBackend from octue.runner import Runner @@ -47,9 +46,5 @@ def answer_question(question, project_name): logger.info("Analysis successfully run and response sent for question %r.", question_uuid) except BaseException as error: # noqa - service.send_exception( - topic=Topic(name=convert_service_id_to_pub_sub_form(service_sruid), project_name=project_name), - question_uuid=question_uuid, - ) - + service.send_exception(question_uuid=question_uuid, originator="UNKNOWN") logger.exception(error) diff --git a/octue/cloud/emulators/_pub_sub.py b/octue/cloud/emulators/_pub_sub.py index e497be5e5..527ceb45b 100644 --- a/octue/cloud/emulators/_pub_sub.py +++ b/octue/cloud/emulators/_pub_sub.py @@ -1,12 +1,12 @@ import importlib.metadata import json import logging +from collections import defaultdict import google.api_core from octue.cloud.pub_sub import Subscription, Topic -from octue.cloud.pub_sub.service import ANSWERS_NAMESPACE, PARENT_SENDER_TYPE, Service -from octue.cloud.service_id import convert_service_id_to_pub_sub_form +from octue.cloud.pub_sub.service import PARENT_SENDER_TYPE, Service from octue.resources import Manifest from octue.utils.dictionaries import make_minimal_dictionary from octue.utils.encoders import OctueJSONEncoder @@ -14,8 +14,9 @@ logger = logging.getLogger(__name__) -TOPICS = {} -SUBSCRIPTIONS = {} +TOPICS = set() +SUBSCRIPTIONS = set() +MESSAGES = defaultdict(list) class MockTopic(Topic): @@ -33,7 +34,7 @@ def create(self, allow_existing=False): raise google.api_core.exceptions.AlreadyExists(f"Topic {self.path!r} already exists.") if not self.exists(): - TOPICS[self.name] = [] + TOPICS.add(self.name) self._created = True def delete(self): @@ -42,7 +43,7 @@ def delete(self): :return None: """ try: - del TOPICS[self.name] + TOPICS.remove(self.name) except KeyError: pass @@ -73,7 +74,7 @@ def create(self, allow_existing=False): raise google.api_core.exceptions.AlreadyExists(f"Subscription {self.path!r} already exists.") if not self.exists(): - SUBSCRIPTIONS[self.name] = [] + SUBSCRIPTIONS.add(self.name) self._created = True def delete(self): @@ -146,8 +147,7 @@ def publish(self, topic, data, retry=None, **attributes): :param google.api_core.retry.Retry|None retry: :return MockFuture: """ - subscription_name = ".".join((get_pub_sub_resource_name(topic), ANSWERS_NAMESPACE, attributes["question_uuid"])) - SUBSCRIPTIONS[subscription_name].append(MockMessage(data=data, attributes=attributes)) + MESSAGES[attributes["question_uuid"]].append(MockMessage(data=data, attributes=attributes)) return MockFuture() @@ -191,12 +191,12 @@ def pull(self, request, timeout=None, retry=None): if self.closed: raise ValueError("ValueError: Cannot invoke RPC: Channel closed!") - subscription_name = get_pub_sub_resource_name(request["subscription"]) + question_uuid = request["subscription"].split(".")[-1] try: return MockPullResponse( received_messages=[ - MockMessageWrapper(message=SUBSCRIPTIONS[subscription_name].pop(0)), + MockMessageWrapper(message=MESSAGES[question_uuid].pop(0)), ] ) @@ -242,6 +242,13 @@ def __init__(self, message): self.message = message self.ack_id = None + def __repr__(self): + """Represent the mock message as a string. + + :return str: + """ + return f"<{type(self).__name__}(message={self.message})>" + class MockMessage: """A mock Pub/Sub message containing serialised data and any number of attributes. @@ -356,8 +363,7 @@ def ask( # Delete question from messages sent to subscription so the parent doesn't pick it up as a response message. We # do this as subscription filtering isn't implemented in this set of mocks. - subscription_name = ".".join((convert_service_id_to_pub_sub_form(service_id), ANSWERS_NAMESPACE, question_uuid)) - SUBSCRIPTIONS["octue.services." + subscription_name].pop(0) + MESSAGES[question_uuid].pop(0) question = make_minimal_dictionary(kind="question", input_values=input_values, children=children) @@ -371,13 +377,15 @@ def ask( MockMessage.from_primitive( data=question, attributes={ - "sender_type": PARENT_SENDER_TYPE, "question_uuid": question_uuid, "forward_logs": subscribe_to_logs, - "version": parent_sdk_version, "save_diagnostics": save_diagnostics, - "message_number": 0, - "sender": service_id, + "order": 0, + "originator": self.id, + "sender": self.id, + "sender_type": PARENT_SENDER_TYPE, + "sender_sdk_version": parent_sdk_version, + "recipient": service_id, }, ) ) diff --git a/octue/cloud/events/__init__.py b/octue/cloud/events/__init__.py index e69de29bb..66898c235 100644 --- a/octue/cloud/events/__init__.py +++ b/octue/cloud/events/__init__.py @@ -0,0 +1 @@ +OCTUE_SERVICES_PREFIX = "octue.services" diff --git a/octue/cloud/events/counter.py b/octue/cloud/events/counter.py new file mode 100644 index 000000000..5edc208a1 --- /dev/null +++ b/octue/cloud/events/counter.py @@ -0,0 +1,35 @@ +class EventCounter: + """A mutable counter for keeping track of the emission order of events. This is used in the `Service` class instead + of an integer because it is mutable and can be passed to the `Service._emit_event` method and incremented as + events are emitted. + + :return None: + """ + + def __init__(self): + self.count = 0 + + def __iadd__(self, other): + """Increment the counter by an integer. + + :return octue.cloud.events.counter.EventCounter: the event counter with its count updated + """ + if not isinstance(other, int): + raise ValueError(f"Event counters can only be incremented by an integer; received {other!r}.") + + self.count += other + return self + + def __int__(self): + """Get the counter as an integer. + + :return int: the counter as an integer + """ + return int(self.count) + + def __repr__(self): + """Represent the counter as a string. + + :return str: the counter represented as a string. + """ + return f"<{type(self).__name__}(count={self.count})" diff --git a/octue/cloud/events/handler.py b/octue/cloud/events/handler.py index c54d44b6b..62c2d489f 100644 --- a/octue/cloud/events/handler.py +++ b/octue/cloud/events/handler.py @@ -31,7 +31,7 @@ class AbstractEventHandler: """An abstract event handler. Inherit from this and add the `handle_events` and `_extract_event_and_attributes` methods to handle events from a specific source synchronously or asynchronously. - :param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the events + :param octue.cloud.pub_sub.service.Service recipient: the service that's receiving the events :param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive :param bool record_events: if `True`, record received events in the `received_events` attribute :param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers should not mutate the events. @@ -43,7 +43,7 @@ class AbstractEventHandler: def __init__( self, - receiving_service, + recipient, handle_monitor_message=None, record_events=True, event_handlers=None, @@ -51,7 +51,7 @@ def __init__( skip_missing_events_after=10, only_handle_result=False, ): - self.receiving_service = receiving_service + self.recipient = recipient self.handle_monitor_message = handle_monitor_message self.record_events = record_events self.schema = schema @@ -122,33 +122,31 @@ def _extract_and_enqueue_event(self, container): if not is_event_valid( event=event, attributes=attributes, - receiving_service=self.receiving_service, + recipient=self.recipient, parent_sdk_version=PARENT_SDK_VERSION, - child_sdk_version=attributes.get("version"), + child_sdk_version=attributes["sender_sdk_version"], schema=self.schema, ): return # Get the child's SRUID and Octue SDK version from the first event. if not self._child_sdk_version: - self.question_uuid = attributes.get("question_uuid") - self._child_sdk_version = attributes["version"] + self.child_sruid = attributes["sender"] + self.question_uuid = attributes["question_uuid"] + self._child_sdk_version = attributes["sender_sdk_version"] - # Backwards-compatible with previous event schema versions. - self.child_sruid = attributes.get("sender", "REMOTE") + logger.debug("%r received an event related to question %r.", self.recipient, self.question_uuid) + order = attributes["order"] - logger.debug("%r received an event related to question %r.", self.receiving_service, self.question_uuid) - event_number = attributes["message_number"] - - if event_number in self.waiting_events: + if order in self.waiting_events: logger.warning( - "%r: Event with duplicate event number %d received for question %s - overwriting original event.", - self.receiving_service, - event_number, + "%r: Event with duplicate order %d received for question %s - overwriting original event.", + self.recipient, + order, self.question_uuid, ) - self.waiting_events[event_number] = event + self.waiting_events[order] = event def _attempt_to_handle_waiting_events(self): """Attempt to handle events waiting in `self.waiting_events`. If these events aren't consecutive to the @@ -204,7 +202,7 @@ def _skip_to_earliest_waiting_event(self): logger.warning( "%r: %d consecutive events missing for question %r after %ds - skipping to next earliest waiting event " "(event %d).", - self.receiving_service, + self.recipient, number_of_missing_events, self.question_uuid, self.skip_missing_events_after, @@ -236,7 +234,7 @@ def _handle_delivery_acknowledgement(self, event): :param dict event: :return None: """ - logger.info("%r's question was delivered at %s.", self.receiving_service, event["datetime"]) + logger.info("%r's question was delivered at %s.", self.recipient, event["datetime"]) def _handle_heartbeat(self, event): """Record the time the heartbeat was received. @@ -253,7 +251,7 @@ def _handle_monitor_message(self, event): :param dict event: :return None: """ - logger.debug("%r received a monitor message.", self.receiving_service) + logger.debug("%r received a monitor message.", self.recipient) if self.handle_monitor_message is not None: self.handle_monitor_message(event["data"]) @@ -317,7 +315,7 @@ def _handle_result(self, event): :param dict event: :return dict: """ - logger.info("%r received an answer to question %r.", self.receiving_service, self.question_uuid) + logger.info("%r received an answer to question %r.", self.recipient, self.question_uuid) if event.get("output_manifest"): output_manifest = Manifest.deserialise(event["output_manifest"]) diff --git a/octue/cloud/events/replayer.py b/octue/cloud/events/replayer.py index f9b055119..fb1dec272 100644 --- a/octue/cloud/events/replayer.py +++ b/octue/cloud/events/replayer.py @@ -12,7 +12,7 @@ class EventReplayer(AbstractEventHandler): """A replayer for events retrieved asynchronously from some kind of storage. - :param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the events + :param octue.cloud.pub_sub.service.Service recipient: the service that's receiving the events :param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive :param bool record_events: if `True`, record received events in the `received_events` attribute :param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers should not mutate the events. @@ -23,7 +23,7 @@ class EventReplayer(AbstractEventHandler): def __init__( self, - receiving_service=None, + recipient=None, handle_monitor_message=None, record_events=True, event_handlers=None, @@ -31,7 +31,7 @@ def __init__( only_handle_result=False, ): super().__init__( - receiving_service or Service(backend=ServiceBackend(), service_id="local/local:local"), + recipient or Service(backend=ServiceBackend(), service_id="local/local:local"), handle_monitor_message=handle_monitor_message, record_events=record_events, event_handlers=event_handlers, @@ -51,8 +51,10 @@ def handle_events(self, events): for event in events: self._extract_and_enqueue_event(event) - self._earliest_waiting_event_number = min(self.waiting_events.keys()) - return self._attempt_to_handle_waiting_events() + # Handle the case where no events (or no valid events) have been received. + if self.waiting_events: + self._earliest_waiting_event_number = min(self.waiting_events.keys()) + return self._attempt_to_handle_waiting_events() def _extract_event_and_attributes(self, container): """Extract an event and its attributes from the event container. @@ -60,5 +62,5 @@ def _extract_event_and_attributes(self, container): :param dict container: the container of the event :return (any, dict): the event and its attributes """ - container["attributes"]["message_number"] = int(container["attributes"]["message_number"]) + container["attributes"]["order"] = int(container["attributes"]["order"]) return container["event"], container["attributes"] diff --git a/octue/cloud/events/validation.py b/octue/cloud/events/validation.py index 553018401..39dfd003f 100644 --- a/octue/cloud/events/validation.py +++ b/octue/cloud/events/validation.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -SERVICE_COMMUNICATION_SCHEMA = {"$ref": "https://jsonschema.registry.octue.com/octue/service-communication/0.8.3.json"} +SERVICE_COMMUNICATION_SCHEMA = {"$ref": "https://jsonschema.registry.octue.com/octue/service-communication/0.9.0.json"} SERVICE_COMMUNICATION_SCHEMA_INFO_URL = "https://strands.octue.com/octue/service-communication" SERVICE_COMMUNICATION_SCHEMA_VERSION = os.path.splitext(SERVICE_COMMUNICATION_SCHEMA["$ref"])[0].split("/")[-1] @@ -18,12 +18,12 @@ jsonschema_validator = jsonschema.Draft202012Validator(SERVICE_COMMUNICATION_SCHEMA) -def is_event_valid(event, attributes, receiving_service, parent_sdk_version, child_sdk_version, schema=None): +def is_event_valid(event, attributes, recipient, parent_sdk_version, child_sdk_version, schema=None): """Check if the event and its attributes are valid according to the Octue services communication schema. :param dict event: the event to validate :param dict attributes: the attributes of the event to validate - :param octue.cloud.pub_sub.service.Service receiving_service: the service receiving and validating the event + :param octue.cloud.pub_sub.service.Service recipient: the service receiving and validating the event :param str parent_sdk_version: the semantic version of Octue SDK running on the parent :param str child_sdk_version: the semantic version of Octue SDK running on the child :param dict|None schema: the schema to validate the event and its attributes against; if `None`, this defaults to the service communication schema used in this version of Octue SDK @@ -33,7 +33,7 @@ def is_event_valid(event, attributes, receiving_service, parent_sdk_version, chi raise_if_event_is_invalid( event, attributes, - receiving_service, + recipient, parent_sdk_version, child_sdk_version, schema=schema, @@ -44,19 +44,12 @@ def is_event_valid(event, attributes, receiving_service, parent_sdk_version, chi return True -def raise_if_event_is_invalid( - event, - attributes, - receiving_service, - parent_sdk_version, - child_sdk_version, - schema=None, -): +def raise_if_event_is_invalid(event, attributes, recipient, parent_sdk_version, child_sdk_version, schema=None): """Raise an error if the event or its attributes aren't valid according to the Octue services communication schema. :param dict event: the event to validate :param dict attributes: the attributes of the event to validate - :param octue.cloud.pub_sub.service.Service receiving_service: the service receiving and validating the event + :param octue.cloud.pub_sub.service.Service recipient: the service receiving and validating the event :param str parent_sdk_version: the semantic version of Octue SDK running on the parent :param str child_sdk_version: the semantic version of Octue SDK running on the child :param dict|None schema: the schema to validate the event and its attributes against; if `None`, this defaults to the service communication schema used in this version of Octue SDK @@ -82,7 +75,7 @@ def raise_if_event_is_invalid( logger.exception( "%r received an event that doesn't conform with version %s of the service communication schema (%s): %r.", - receiving_service, + recipient, SERVICE_COMMUNICATION_SCHEMA_VERSION, SERVICE_COMMUNICATION_SCHEMA_INFO_URL, event, diff --git a/octue/cloud/pub_sub/__init__.py b/octue/cloud/pub_sub/__init__.py index ab0cccf99..b6239757a 100644 --- a/octue/cloud/pub_sub/__init__.py +++ b/octue/cloud/pub_sub/__init__.py @@ -1,3 +1,4 @@ +from octue.cloud.events import OCTUE_SERVICES_PREFIX from octue.cloud.service_id import convert_service_id_to_pub_sub_form from .subscription import Subscription @@ -13,7 +14,6 @@ def create_push_subscription( push_endpoint, subscription_filter=None, expiration_time=None, - subscription_suffix=None, ): """Create a Google Pub/Sub push subscription for an Octue service for it to receive questions from parents. If a corresponding topic doesn't exist, it will be created first. @@ -23,28 +23,16 @@ def create_push_subscription( :param str push_endpoint: the HTTP/HTTPS endpoint of the service to push to. It should be fully formed and include the 'https://' prefix :param str|None subscription_filter: if specified, the filter to apply to the subscription; otherwise, no filter is applied :param float|None expiration_time: the number of seconds of inactivity after which the subscription should expire. If not provided, no expiration time is applied to the subscription - :param str|None subscription_suffix: if provided, add a suffix to the end of the subscription name. This is useful when needing to create multiple subscriptions for the same topic (subscription names are unique). :return None: """ - topic_name = convert_service_id_to_pub_sub_form(sruid) - - if subscription_suffix: - subscription_name = topic_name + subscription_suffix - else: - subscription_name = topic_name - - topic = Topic(name=topic_name, project_name=project_name) - topic.create(allow_existing=True) - if expiration_time: expiration_time = float(expiration_time) else: expiration_time = None subscription = Subscription( - name=subscription_name, - topic=topic, - project_name=project_name, + name=convert_service_id_to_pub_sub_form(sruid), + topic=Topic(name=OCTUE_SERVICES_PREFIX, project_name=project_name), filter=subscription_filter, expiration_time=expiration_time, push_endpoint=push_endpoint, diff --git a/octue/cloud/pub_sub/bigquery.py b/octue/cloud/pub_sub/bigquery.py index 242037e5f..409372039 100644 --- a/octue/cloud/pub_sub/bigquery.py +++ b/octue/cloud/pub_sub/bigquery.py @@ -58,7 +58,7 @@ def get_events(table_id, question_uuid, kind=None, limit=1000, include_pub_sub_m if isinstance(messages.at[0, "attributes"], str): messages["attributes"] = messages["attributes"].map(json.loads) - # Order messages by the message number. - messages = messages.iloc[messages["attributes"].str.get("message_number").astype(str).argsort()] + # Order messages. + messages = messages.iloc[messages["attributes"].str.get("order").astype(str).argsort()] messages.rename(columns={"data": "event"}, inplace=True) return messages.to_dict(orient="records") diff --git a/octue/cloud/pub_sub/events.py b/octue/cloud/pub_sub/events.py index cb515d5e5..115d8602f 100644 --- a/octue/cloud/pub_sub/events.py +++ b/octue/cloud/pub_sub/events.py @@ -31,19 +31,25 @@ def extract_event_and_attributes_from_pub_sub_message(message): # Cast attributes to a dictionary to avoid defaultdict-like behaviour from Pub/Sub message attributes container. attributes = dict(getattr_or_subscribe(message, "attributes")) + # Required for all events. converted_attributes = { - "sender_type": attributes["sender_type"], "question_uuid": attributes["question_uuid"], - "message_number": int(attributes["message_number"]), - "version": attributes["version"], - "sender": attributes.get("sender", "REMOTE"), # Backwards-compatible with previous event schema versions. + "order": int(attributes["order"]), + "originator": attributes["originator"], + "sender": attributes["sender"], + "sender_type": attributes["sender_type"], + "sender_sdk_version": attributes["sender_sdk_version"], + "recipient": attributes["recipient"], } - if "forward_logs" in attributes: - converted_attributes["forward_logs"] = bool(int(attributes["forward_logs"])) - - if "save_diagnostics" in attributes: - converted_attributes["save_diagnostics"] = attributes["save_diagnostics"] + # Required for question events. + if attributes["sender_type"] == "PARENT": + converted_attributes.update( + { + "forward_logs": bool(int(attributes["forward_logs"])), + "save_diagnostics": attributes["save_diagnostics"], + } + ) try: # Parse event directly from Pub/Sub or Dataflow. @@ -59,7 +65,7 @@ class GoogleCloudPubSubEventHandler(AbstractEventHandler): """A synchronous handler for events received as Google Pub/Sub messages from a pull subscription. :param octue.cloud.pub_sub.subscription.Subscription subscription: the subscription messages are pulled from - :param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the events + :param octue.cloud.pub_sub.service.Service recipient: the service that's receiving the events :param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive :param bool record_events: if `True`, record received events in the `received_events` attribute :param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers should not mutate the events. @@ -71,7 +77,7 @@ class GoogleCloudPubSubEventHandler(AbstractEventHandler): def __init__( self, subscription, - receiving_service, + recipient, handle_monitor_message=None, record_events=True, event_handlers=None, @@ -81,7 +87,7 @@ def __init__( self.subscription = subscription super().__init__( - receiving_service, + recipient, handle_monitor_message=handle_monitor_message, record_events=record_events, event_handlers=event_handlers, @@ -240,7 +246,9 @@ def _pull_and_enqueue_available_events(self, timeout): for event in pull_response.received_messages: self._extract_and_enqueue_event(event) - self._earliest_waiting_event_number = min(self.waiting_events.keys()) + # Handle the case where no events (or no valid events) have been received. + if self.waiting_events: + self._earliest_waiting_event_number = min(self.waiting_events.keys()) def _extract_event_and_attributes(self, container): """Extract an event and its attributes from the Pub/Sub message. diff --git a/octue/cloud/pub_sub/logging.py b/octue/cloud/pub_sub/logging.py index 4df17b92e..e8d78a6a8 100644 --- a/octue/cloud/pub_sub/logging.py +++ b/octue/cloud/pub_sub/logging.py @@ -8,19 +8,23 @@ class GoogleCloudPubSubHandler(logging.Handler): """A log handler that publishes log records to a Google Cloud Pub/Sub topic. - :param callable message_sender: the `_send_message` method of the service that instantiated this instance - :param octue.cloud.pub_sub.topic.Topic topic: topic to publish log records to + :param callable event_emitter: the `_emit_event` method of the service that instantiated this instance :param str question_uuid: the UUID of the question to handle log records for + :param str originator: the SRUID of the service that asked the question these log records are related to + :param str recipient: the SRUID of the service to send these log records to + :param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events :param float timeout: timeout in seconds for attempting to publish each log record :return None: """ - def __init__(self, message_sender, topic, question_uuid, timeout=60, *args, **kwargs): + def __init__(self, event_emitter, question_uuid, originator, recipient, order, timeout=60, *args, **kwargs): super().__init__(*args, **kwargs) - self.topic = topic self.question_uuid = question_uuid + self.originator = originator + self.recipient = recipient + self.order = order self.timeout = timeout - self._send_message = message_sender + self._emit_event = event_emitter def emit(self, record): """Serialise the log record as a dictionary and publish it to the topic. @@ -29,12 +33,14 @@ def emit(self, record): :return None: """ try: - self._send_message( + self._emit_event( { "kind": "log_record", "log_record": self._convert_log_record_to_primitives(record), }, - topic=self.topic, + originator=self.originator, + recipient=self.recipient, + order=self.order, attributes={ "question_uuid": self.question_uuid, "sender_type": "CHILD", # The sender type is repeated here as a string to avoid a circular import. diff --git a/octue/cloud/pub_sub/service.py b/octue/cloud/pub_sub/service.py index e7ed3c349..cdbcc443a 100644 --- a/octue/cloud/pub_sub/service.py +++ b/octue/cloud/pub_sub/service.py @@ -14,6 +14,8 @@ from google.cloud import pubsub_v1 import octue.exceptions +from octue.cloud.events import OCTUE_SERVICES_PREFIX +from octue.cloud.events.counter import EventCounter from octue.cloud.events.validation import raise_if_event_is_invalid from octue.cloud.pub_sub import Subscription, Topic from octue.cloud.pub_sub.events import GoogleCloudPubSubEventHandler, extract_event_and_attributes_from_pub_sub_message @@ -35,10 +37,10 @@ logger = logging.getLogger(__name__) -# A lock to ensure only one message can be sent at a time so that the message number is incremented correctly when -# messages are being sent on multiple threads (e.g. via the main thread and a periodic monitor message thread). This -# avoids 1) messages overwriting each other in the parent's message handler and 2) messages losing their order. -send_message_lock = threading.Lock() +# A lock to ensure only one event can be emitted at a time so that the order is incremented correctly when events are +# being emitted on multiple threads (e.g. via the main thread and a periodic monitor message thread). This avoids 1) +# events overwriting each other in the parent's message handler and 2) events losing their order. +emit_event_lock = threading.Lock() DEFAULT_NAMESPACE = "default" ANSWERS_NAMESPACE = "answers" @@ -85,12 +87,12 @@ def __init__(self, backend, service_id=None, run_function=None, name=None, servi self.backend = backend self.run_function = run_function self.name = name - self.service_registries = service_registries self._pub_sub_id = convert_service_id_to_pub_sub_form(self.id) self._local_sdk_version = importlib.metadata.version("octue") self._publisher = None + self._services_topic = None self._event_handler = None def __repr__(self): @@ -113,6 +115,25 @@ def publisher(self): return self._publisher + @property + def services_topic(self): + """Get the Octue services topic that all events in the project are published to. No topic is instantiated until + this property is called for the first time. This allows checking for the `GOOGLE_APPLICATION_CREDENTIALS` + environment variable to be put off until it's needed. + + :raise octue.exceptions.ServiceNotFound: if the topic doesn't exist in the project + :return octue.cloud.pub_sub.topic.Topic: the Octue services topic for the project + """ + if not self._services_topic: + topic = Topic(name=OCTUE_SERVICES_PREFIX, project_name=self.backend.project_name) + + if not topic.exists(): + raise octue.exceptions.ServiceNotFound(f"Topic {topic.name!r} cannot be found.") + + self._services_topic = topic + + return self._services_topic + @property def received_messages(self): """Get the messages received by the service from a child service while running the `wait_for_answer` method. If @@ -136,17 +157,15 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow :return (google.cloud.pubsub_v1.subscriber.futures.StreamingPullFuture, google.cloud.pubsub_v1.SubscriberClient): """ logger.info("Starting %r.", self) - topic = Topic(name=self._pub_sub_id, project_name=self.backend.project_name) subscription = Subscription( - name=self._pub_sub_id, - topic=topic, - filter=f'attributes.sender_type = "{PARENT_SENDER_TYPE}"', + name=".".join((OCTUE_SERVICES_PREFIX, self._pub_sub_id)), + topic=self.services_topic, + filter=f'attributes.recipient = "{self.id}" AND attributes.sender_type = "{PARENT_SENDER_TYPE}"', expiration_time=None, ) try: - topic.create(allow_existing=allow_existing) subscription.create(allow_existing=allow_existing) except google.api_core.exceptions.AlreadyExists: raise octue.exceptions.ServiceAlreadyExists(f"A service with the ID {self.id!r} already exists.") @@ -177,11 +196,8 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow if subscription.creation_triggered_locally: subscription.delete() - if topic.creation_triggered_locally: - topic.delete() - except Exception: - logger.error("Deletion of topic and/or subscription %r failed.", topic.name) + logger.error("Deletion of subscription %r failed.", subscription.name) subscriber.close() @@ -204,20 +220,21 @@ def answer(self, question, heartbeat_interval=120, timeout=30): forward_logs, parent_sdk_version, save_diagnostics, + originator, ) = self._parse_question(question) except jsonschema.ValidationError: return - topic = Topic(name=self._pub_sub_id, project_name=self.backend.project_name) heartbeater = None + order = EventCounter() try: - self._send_delivery_acknowledgment(topic, question_uuid) + self._send_delivery_acknowledgment(question_uuid, originator, order) heartbeater = RepeatingTimer( interval=heartbeat_interval, function=self._send_heartbeat, - kwargs={"topic": topic, "question_uuid": question_uuid}, + kwargs={"question_uuid": question_uuid, "originator": originator, "order": order}, ) heartbeater.daemon = True @@ -225,9 +242,11 @@ def answer(self, question, heartbeat_interval=120, timeout=30): if forward_logs: analysis_log_handler = GoogleCloudPubSubHandler( - message_sender=self._send_message, - topic=topic, + event_emitter=self._emit_event, question_uuid=question_uuid, + originator=originator, + recipient=originator, + order=order, ) else: analysis_log_handler = None @@ -240,8 +259,9 @@ def answer(self, question, heartbeat_interval=120, timeout=30): analysis_log_handler=analysis_log_handler, handle_monitor_message=functools.partial( self._send_monitor_message, - topic=topic, question_uuid=question_uuid, + originator=originator, + order=order, ), save_diagnostics=save_diagnostics, ) @@ -251,9 +271,11 @@ def answer(self, question, heartbeat_interval=120, timeout=30): if analysis.output_manifest is not None: result["output_manifest"] = analysis.output_manifest.to_primitive() - self._send_message( - message=result, - topic=topic, + self._emit_event( + event=result, + originator=originator, + recipient=originator, + order=order, attributes={"question_uuid": question_uuid, "sender_type": CHILD_SENDER_TYPE}, timeout=timeout, ) @@ -266,7 +288,7 @@ def answer(self, question, heartbeat_interval=120, timeout=30): heartbeater.cancel() warn_if_incompatible(child_sdk_version=self._local_sdk_version, parent_sdk_version=parent_sdk_version) - self.send_exception(topic, question_uuid, timeout=timeout) + self.send_exception(question_uuid, originator, order, timeout=timeout) raise error def ask( @@ -325,20 +347,21 @@ def ask( "the new cloud locations." ) - topic = Topic(name=convert_service_id_to_pub_sub_form(service_id), project_name=self.backend.project_name) - - if not topic.exists(timeout=timeout): - raise octue.exceptions.ServiceNotFound(f"Service with ID {service_id!r} cannot be found.") - question_uuid = question_uuid or str(uuid.uuid4()) if asynchronous and not push_endpoint: answer_subscription = None else: + pub_sub_id = convert_service_id_to_pub_sub_form(self.id) + answer_subscription = Subscription( - name=".".join((topic.name, ANSWERS_NAMESPACE, question_uuid)), - topic=topic, - filter=f'attributes.question_uuid = "{question_uuid}" AND attributes.sender_type = "{CHILD_SENDER_TYPE}"', + name=".".join((OCTUE_SERVICES_PREFIX, pub_sub_id, ANSWERS_NAMESPACE, question_uuid)), + topic=self.services_topic, + filter=( + f'attributes.recipient = "{self.id}" ' + f'AND attributes.question_uuid = "{question_uuid}" ' + f'AND attributes.sender_type = "{CHILD_SENDER_TYPE}"' + ), push_endpoint=push_endpoint, ) answer_subscription.create(allow_existing=False) @@ -350,7 +373,6 @@ def ask( service_id=service_id, forward_logs=subscribe_to_logs, save_diagnostics=save_diagnostics, - topic=topic, question_uuid=question_uuid, ) @@ -384,7 +406,7 @@ def wait_for_answer( self._event_handler = GoogleCloudPubSubEventHandler( subscription=subscription, - receiving_service=self, + recipient=self, handle_monitor_message=handle_monitor_message, record_events=record_messages, ) @@ -398,45 +420,53 @@ def wait_for_answer( finally: subscription.delete() - def send_exception(self, topic, question_uuid, timeout=30): + def send_exception(self, question_uuid, originator, order, timeout=30): """Serialise and send the exception being handled to the parent. - :param octue.cloud.pub_sub.topic.Topic topic: :param str question_uuid: + :param str originator: the SRUID of the service that asked the question this event is related to + :param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events :param float|None timeout: time in seconds to keep retrying sending of the exception :return None: """ exception = convert_exception_to_primitives() exception_message = f"Error in {self!r}: {exception['message']}" - self._send_message( + self._emit_event( { "kind": "exception", "exception_type": exception["type"], "exception_message": exception_message, "exception_traceback": exception["traceback"], }, - topic=topic, + originator=originator, + recipient=originator, + order=order, attributes={"question_uuid": question_uuid, "sender_type": CHILD_SENDER_TYPE}, timeout=timeout, ) - def _send_message(self, message, topic, attributes=None, timeout=30): - """Send a JSON-serialised message to the given topic with optional message attributes and increment the - `messages_published` attribute of the topic by one. This method is thread-safe. + def _emit_event(self, event, originator, recipient, order, attributes=None, timeout=30): + """Emit a JSON-serialised event as a Pub/Sub message to the services topic with optional message attributes, + incrementing the `order` argument by one. This method is thread-safe. - :param dict message: JSON-serialisable data to send as a message - :param octue.cloud.pub_sub.topic.Topic topic: the Pub/Sub topic to send the message to - :param dict|None attributes: key-value pairs to attach to the message - the values must be strings or bytes - :param int|float timeout: the timeout for sending the message in seconds + :param dict event: JSON-serialisable data to emit as an event + :param str originator: the SRUID of the service that asked the question this event is related to + :param str recipient: the SRUID of the service the event is intended for + :param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events + :param dict|None attributes: key-value pairs to attach to the event - the values must be strings or bytes + :param int|float timeout: the timeout for sending the event in seconds :return google.cloud.pubsub_v1.publisher.futures.Future: """ attributes = attributes or {} - - with send_message_lock: - attributes["version"] = self._local_sdk_version - attributes["message_number"] = topic.messages_published - attributes["sender"] = self.id + attributes["uuid"] = str(uuid.uuid4()) + attributes["originator"] = originator + attributes["sender"] = self.id + attributes["sender_sdk_version"] = self._local_sdk_version + attributes["recipient"] = recipient + + with emit_event_lock: + attributes["order"] = int(order) converted_attributes = {} for key, value in attributes.items(): @@ -448,13 +478,13 @@ def _send_message(self, message, topic, attributes=None, timeout=30): converted_attributes[key] = value future = self.publisher.publish( - topic=topic.path, - data=json.dumps(message, cls=OctueJSONEncoder).encode(), + topic=self.services_topic.path, + data=json.dumps(event, cls=OctueJSONEncoder).encode(), retry=retry.Retry(deadline=timeout), **converted_attributes, ) - topic.messages_published += 1 + order += 1 return future @@ -466,7 +496,6 @@ def _send_question( service_id, forward_logs, save_diagnostics, - topic, question_uuid, timeout=30, ): @@ -478,7 +507,6 @@ def _send_question( :param str service_id: the ID of the child to send the question to :param bool forward_logs: whether to request the child to forward its logs :param str save_diagnostics: must be one of {"SAVE_DIAGNOSTICS_OFF", "SAVE_DIAGNOSTICS_ON_CRASH", "SAVE_DIAGNOSTICS_ON"}; if turned on, allow the input values and manifest (and its datasets) to be saved by the child either all the time or just if it fails while processing them - :param octue.cloud.pub_sub.topic.Topic topic: topic to send the acknowledgement to :param str question_uuid: the UUID of the question being sent :param float timeout: time in seconds after which to give up sending :return None: @@ -489,15 +517,17 @@ def _send_question( input_manifest.use_signed_urls_for_datasets() question["input_manifest"] = input_manifest.to_primitive() - future = self._send_message( - message=question, - topic=topic, + future = self._emit_event( + event=question, timeout=timeout, + originator=self.id, + recipient=service_id, + order=EventCounter(), attributes={ "question_uuid": question_uuid, - "sender_type": PARENT_SENDER_TYPE, "forward_logs": forward_logs, "save_diagnostics": save_diagnostics, + "sender_type": PARENT_SENDER_TYPE, }, ) @@ -505,58 +535,67 @@ def _send_question( future.result() logger.info("%r asked a question %r to service %r.", self, question_uuid, service_id) - def _send_delivery_acknowledgment(self, topic, question_uuid, timeout=30): + def _send_delivery_acknowledgment(self, question_uuid, originator, order, timeout=30): """Send an acknowledgement of question receipt to the parent. - :param octue.cloud.pub_sub.topic.Topic topic: topic to send the acknowledgement to :param str question_uuid: + :param str originator: the SRUID of the service that asked the question this event is related to + :param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events :param float timeout: time in seconds after which to give up sending :return None: """ - self._send_message( + self._emit_event( { "kind": "delivery_acknowledgement", "datetime": datetime.datetime.utcnow().isoformat(), }, - topic=topic, timeout=timeout, + originator=originator, + recipient=originator, + order=order, attributes={"question_uuid": question_uuid, "sender_type": CHILD_SENDER_TYPE}, ) logger.info("%r acknowledged receipt of question %r.", self, question_uuid) - def _send_heartbeat(self, topic, question_uuid, timeout=30): + def _send_heartbeat(self, question_uuid, originator, order, timeout=30): """Send a heartbeat to the parent, indicating that the service is alive. - :param octue.cloud.pub_sub.topic.Topic topic: topic to send the heartbeat to :param str question_uuid: + :param str originator: the SRUID of the service that asked the question this event is related to + :param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events :param float timeout: time in seconds after which to give up sending :return None: """ - self._send_message( + self._emit_event( { "kind": "heartbeat", "datetime": datetime.datetime.utcnow().isoformat(), }, - topic=topic, + originator=originator, + recipient=originator, + order=order, timeout=timeout, attributes={"question_uuid": question_uuid, "sender_type": CHILD_SENDER_TYPE}, ) logger.debug("Heartbeat sent by %r.", self) - def _send_monitor_message(self, data, topic, question_uuid, timeout=30): + def _send_monitor_message(self, data, question_uuid, originator, order, timeout=30): """Send a monitor message to the parent. :param any data: the data to send as a monitor message - :param octue.cloud.pub_sub.topic.Topic topic: the topic to send the message to :param str question_uuid: + :param str originator: the SRUID of the service that asked the question this event is related to + :param octue.cloud.events.counter.EventCounter order: an event counter keeping track of the order of emitted events :param float timeout: time in seconds to retry sending the message :return None: """ - self._send_message( + self._emit_event( {"kind": "monitor_message", "data": data}, - topic=topic, + originator=originator, + recipient=originator, + order=order, timeout=timeout, attributes={"question_uuid": question_uuid, "sender_type": CHILD_SENDER_TYPE}, ) @@ -567,11 +606,11 @@ def _parse_question(self, question): """Parse a question in the Google Cloud Run or Google Pub/Sub format. :param dict|google.cloud.pubsub_v1.subscriber.message.Message question: the question to parse in Google Cloud Run or Google Pub/Sub format - :return (dict, str, bool, str, str): the question's event and its attributes (question UUID, whether to forward logs, the Octue SDK version of the parent, and whether to save diagnostics) + :return (dict, str, bool, str, str, str): the question's event and its attributes (question UUID, whether to forward logs, the Octue SDK version of the parent, whether to save diagnostics, and the SRUID of the service revision that asked the question) """ logger.info("%r received a question.", self) - # Acknowledge it if it's directly from Pub/Sub + # Acknowledge the question if it's directly from Pub/Sub. if hasattr(question, "ack"): question.ack() @@ -581,17 +620,18 @@ def _parse_question(self, question): raise_if_event_is_invalid( event=event_for_validation, attributes=attributes, - receiving_service=self, - parent_sdk_version=attributes.get("version"), + recipient=self, + parent_sdk_version=attributes["sender_sdk_version"], child_sdk_version=importlib.metadata.version("octue"), ) - logger.info("%r parsed the question successfully.", self) + logger.info("%r parsed question %r successfully.", self, attributes["question_uuid"]) return ( event, attributes["question_uuid"], attributes["forward_logs"], - attributes["version"], + attributes["sender_sdk_version"], attributes["save_diagnostics"], + attributes["originator"], ) diff --git a/octue/cloud/pub_sub/subscription.py b/octue/cloud/pub_sub/subscription.py index 021843d3f..c9920be03 100644 --- a/octue/cloud/pub_sub/subscription.py +++ b/octue/cloud/pub_sub/subscription.py @@ -13,7 +13,6 @@ UpdateSubscriptionRequest, ) -from octue.cloud.service_id import OCTUE_SERVICES_NAMESPACE from octue.exceptions import ConflictingSubscriptionType @@ -55,11 +54,7 @@ def __init__( push_endpoint=None, bigquery_table_id=None, ): - if not name.startswith(OCTUE_SERVICES_NAMESPACE): - self.name = f"{OCTUE_SERVICES_NAMESPACE}.{name}" - else: - self.name = name - + self.name = name self.topic = topic self.filter = filter self.path = self.generate_subscription_path(project_name or self.topic.project_name, self.name) diff --git a/octue/cloud/pub_sub/topic.py b/octue/cloud/pub_sub/topic.py index cb0ebda83..42d33fafe 100644 --- a/octue/cloud/pub_sub/topic.py +++ b/octue/cloud/pub_sub/topic.py @@ -6,8 +6,6 @@ from google.cloud.pubsub_v1 import PublisherClient from google.pubsub_v1.types.pubsub import Topic as Topic_ -from octue.cloud.service_id import OCTUE_SERVICES_NAMESPACE - logger = logging.getLogger(__name__) @@ -23,14 +21,9 @@ class Topic: """ def __init__(self, name, project_name): - if not name.startswith(OCTUE_SERVICES_NAMESPACE): - self.name = f"{OCTUE_SERVICES_NAMESPACE}.{name}" - else: - self.name = name - + self.name = name self.project_name = project_name self.path = self.generate_topic_path(self.project_name, self.name) - self.messages_published = 0 self._publisher = PublisherClient() self._created = False @@ -48,7 +41,7 @@ def __repr__(self): :return str: """ - return f"<{type(self).__name__}({self.name})>" + return f"<{type(self).__name__}(name={self.name!r})>" def create(self, allow_existing=False): """Create a Google Pub/Sub topic that can be published to. diff --git a/octue/cloud/service_id.py b/octue/cloud/service_id.py index fa2e021a8..dd041a38a 100644 --- a/octue/cloud/service_id.py +++ b/octue/cloud/service_id.py @@ -10,9 +10,6 @@ logger = logging.getLogger(__name__) - -OCTUE_SERVICES_NAMESPACE = "octue.services" - SERVICE_NAMESPACE_AND_NAME_PATTERN = r"([a-z0-9])+(-([a-z0-9])+)*" COMPILED_SERVICE_NAMESPACE_AND_NAME_PATTERN = re.compile(SERVICE_NAMESPACE_AND_NAME_PATTERN) @@ -216,7 +213,7 @@ def get_sruid_from_pub_sub_resource_name(name): :param str name: the name of the topic or subscription :return str: the SRUID of the service revision the topic or subscription is related to """ - _, _, namespace, name, revision_tag, *_ = name.split(".") + namespace, name, revision_tag, *_ = name.split(".") return f"{namespace}/{name}:{revision_tag.replace('-', '.')}" diff --git a/octue/configuration.py b/octue/configuration.py index 1a1f2f7d1..c5f487013 100644 --- a/octue/configuration.py +++ b/octue/configuration.py @@ -36,6 +36,8 @@ def __init__( ): self.name = name self.namespace = namespace + self.diagnostics_cloud_path = diagnostics_cloud_path + self.service_registries = service_registries if directory: directory = os.path.abspath(directory) @@ -61,9 +63,6 @@ def __init__( else: self.app_configuration_path = None - self.diagnostics_cloud_path = diagnostics_cloud_path - self.service_registries = service_registries - if kwargs: logger.warning(f"The following keyword arguments were not used by {type(self).__name__}: {kwargs!r}.") diff --git a/octue/resources/service_backends.py b/octue/resources/service_backends.py index 9270b4a52..b0296ed21 100644 --- a/octue/resources/service_backends.py +++ b/octue/resources/service_backends.py @@ -36,17 +36,16 @@ class ServiceBackend(ABC): class GCPPubSubBackend(ServiceBackend): - """A dataclass containing the details needed to use Google Cloud Platform Pub/Sub as a Service backend. + """A dataclass containing the details needed to use Google Cloud Pub/Sub as a Service backend. - :param str project_name: + :param str project_name: the name of the project to use for Pub/Sub :return None: """ def __init__(self, project_name): if project_name is None: raise exceptions.CloudLocationNotSpecified( - "The project name must be specified for a service to connect to the correct Google Cloud Pub/Sub " - f"instance - it's currently {project_name!r}.", + "`project_name` must be specified for a service to connect to the correct service - received None." ) self.project_name = project_name diff --git a/octue/utils/patches.py b/octue/utils/patches.py index 54883acff..b2196b62c 100644 --- a/octue/utils/patches.py +++ b/octue/utils/patches.py @@ -14,11 +14,25 @@ def __enter__(self): :return list(unittest.mock.MagicMock): """ - return [patch.start() for patch in self.patches] + return self.start() def __exit__(self, *args, **kwargs): """Stop the patches. + :return None: + """ + self.stop() + + def start(self): + """Start the patches and return the mocks they produce. + + :return list(unittest.mock.MagicMock): + """ + return [patch.start() for patch in self.patches] + + def stop(self): + """Stop the patches. + :return None: """ for patch in self.patches: diff --git a/terraform/bigquery.tf b/terraform/bigquery.tf index d48391221..a4cb12b98 100644 --- a/terraform/bigquery.tf +++ b/terraform/bigquery.tf @@ -28,6 +28,16 @@ resource "google_bigquery_table" "test_table" { "type": "JSON", "mode": "REQUIRED" }, + { + "name": "uuid", + "type": "STRING", + "mode": "REQUIRED" + }, + { + "name": "originator", + "type": "STRING", + "mode": "REQUIRED" + }, { "name": "sender", "type": "STRING", @@ -39,17 +49,22 @@ resource "google_bigquery_table" "test_table" { "mode": "REQUIRED" }, { - "name": "question_uuid", + "name": "sender_sdk_version", "type": "STRING", "mode": "REQUIRED" }, { - "name": "version", + "name": "recipient", + "type": "STRING", + "mode": "REQUIRED" + }, + { + "name": "question_uuid", "type": "STRING", "mode": "REQUIRED" }, { - "name": "ordering_key", + "name": "order", "type": "STRING", "mode": "REQUIRED" }, diff --git a/terraform/functions.tf b/terraform/functions.tf index cd9d7b13f..3f90238c1 100644 --- a/terraform/functions.tf +++ b/terraform/functions.tf @@ -9,7 +9,7 @@ resource "google_cloudfunctions2_function" "event_handler" { source { storage_source { bucket = "twined-gcp" - object = "event_handler_function_source.zip" + object = "event_handler/0.2.0.zip" } } } @@ -23,12 +23,12 @@ resource "google_cloudfunctions2_function" "event_handler" { } } -# event_trigger { -# trigger_region = var.region -# event_type = "google.cloud.pubsub.topic.v1.messagePublished" -# pubsub_topic = google_pubsub_topic.topic.id -# retry_policy = "RETRY_POLICY_RETRY" -# } + event_trigger { + trigger_region = var.region + event_type = "google.cloud.pubsub.topic.v1.messagePublished" + pubsub_topic = google_pubsub_topic.services_topic.id + retry_policy = "RETRY_POLICY_RETRY" + } } @@ -39,8 +39,3 @@ resource "google_cloud_run_service_iam_member" "function_invoker" { role = "roles/run.invoker" member = "allUsers" } - - -output "function_uri" { - value = google_cloudfunctions2_function.event_handler.service_config[0].uri -} diff --git a/terraform/pub_sub.tf b/terraform/pub_sub.tf new file mode 100644 index 000000000..ce436afec --- /dev/null +++ b/terraform/pub_sub.tf @@ -0,0 +1,3 @@ +resource "google_pubsub_topic" "services_topic" { + name = "octue.services" +} diff --git a/tests/cloud/deployment/google/cloud_run/test_cloud_run_deployment.py b/tests/cloud/deployment/google/cloud_run/test_cloud_run_deployment.py index 4006e9333..bb6e12903 100644 --- a/tests/cloud/deployment/google/cloud_run/test_cloud_run_deployment.py +++ b/tests/cloud/deployment/google/cloud_run/test_cloud_run_deployment.py @@ -13,7 +13,7 @@ class TestCloudRunDeployment(TestCase): # This is the service ID of the example service deployed to Google Cloud Run. child = Child( - id="octue/example-service-cloud-run:0.3.2", + id="octue/example-service-cloud-run:0.4.0", backend={"name": "GCPPubSubBackend", "project_name": os.environ["TEST_PROJECT_NAME"]}, ) diff --git a/tests/cloud/emulators/test_child_emulator.py b/tests/cloud/emulators/test_child_emulator.py index 893d7731f..4c0af6a00 100644 --- a/tests/cloud/emulators/test_child_emulator.py +++ b/tests/cloud/emulators/test_child_emulator.py @@ -2,12 +2,13 @@ import os from octue.cloud import storage -from octue.cloud.emulators._pub_sub import MockService +from octue.cloud.emulators._pub_sub import MockService, MockTopic from octue.cloud.emulators.child import ChildEmulator, ServicePatcher +from octue.cloud.events import OCTUE_SERVICES_PREFIX from octue.cloud.storage import GoogleCloudStorageClient from octue.resources import Manifest from octue.resources.service_backends import GCPPubSubBackend -from tests import MOCK_SERVICE_REVISION_TAG, TEST_BUCKET_NAME, TESTS_DIR +from tests import MOCK_SERVICE_REVISION_TAG, TEST_BUCKET_NAME, TEST_PROJECT_NAME, TESTS_DIR from tests.base import BaseTestCase @@ -15,6 +16,13 @@ class TestChildEmulatorAsk(BaseTestCase): BACKEND = {"name": "GCPPubSubBackend", "project_name": "blah"} + @classmethod + def setUpClass(cls): + topic = MockTopic(name=OCTUE_SERVICES_PREFIX, project_name=TEST_PROJECT_NAME) + + with ServicePatcher(): + topic.create(allow_existing=True) + def test_representation(self): """Test that child emulators are represented correctly.""" self.assertEqual( @@ -297,6 +305,13 @@ class TestChildEmulatorJSONFiles(BaseTestCase): TEST_FILES_DIRECTORY = os.path.join(TESTS_DIR, "cloud", "emulators", "valid_child_emulator_files") + @classmethod + def setUpClass(cls): + topic = MockTopic(name=OCTUE_SERVICES_PREFIX, project_name=TEST_PROJECT_NAME) + + with ServicePatcher(): + topic.create(allow_existing=True) + def test_with_empty_file(self): """Test that a child emulator can be instantiated from an empty JSON file (a JSON file with only an empty object in), asked a question, and produce a trivial result. diff --git a/tests/cloud/pub_sub/test_events.py b/tests/cloud/pub_sub/test_events.py index 2f97ba897..4e75fdbb9 100644 --- a/tests/cloud/pub_sub/test_events.py +++ b/tests/cloud/pub_sub/test_events.py @@ -3,121 +3,147 @@ import uuid from unittest.mock import patch -from octue.cloud.emulators._pub_sub import ( - SUBSCRIPTIONS, - MockMessage, - MockService, - MockSubscriber, - MockSubscription, - MockTopic, -) +from octue.cloud.emulators._pub_sub import MESSAGES, MockMessage, MockService, MockSubscription, MockTopic from octue.cloud.emulators.child import ServicePatcher +from octue.cloud.events import OCTUE_SERVICES_PREFIX from octue.cloud.pub_sub.events import GoogleCloudPubSubEventHandler from octue.resources.service_backends import GCPPubSubBackend from tests import TEST_PROJECT_NAME from tests.base import BaseTestCase -parent = MockService(service_id="my-org/my-service:1.0.0", backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) +class TestPubSubEventHandler(BaseTestCase): + service_patcher = ServicePatcher() + @classmethod + def setUpClass(cls): + cls.service_patcher.start() + cls.question_uuid = str(uuid.uuid4()) -def create_mock_topic_and_subscription(): - """Create a question UUID, mock topic, and mock subscription. + cls.topic = MockTopic(name=OCTUE_SERVICES_PREFIX, project_name=TEST_PROJECT_NAME) + cls.topic.create(allow_existing=True) - :return (str, octue.cloud.emulators._pub_sub.MockTopic, octue.cloud.emulators._pub_sub.MockSubscription): question UUID, topic, and subscription - """ - question_uuid = str(uuid.uuid4()) - topic = MockTopic(name="my-org.my-service.1-0-0", project_name=TEST_PROJECT_NAME) - subscription = MockSubscription(name=f"my-org.my-service.1-0-0.answers.{question_uuid}", topic=topic) + cls.subscription = MockSubscription( + name=f"my-org.my-service.1-0-0.answers.{cls.question_uuid}", + topic=cls.topic, + ) + cls.subscription.create() - subscription.create() - return question_uuid, topic, subscription + cls.parent = MockService( + service_id="my-org/my-service:1.0.0", + backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME), + ) + @classmethod + def tearDownClass(cls): + """Stop the services patcher. + + :return None: + """ + cls.service_patcher.stop() -class TestPubSubEventHandler(BaseTestCase): def test_timeout(self): """Test that a TimeoutError is raised if message handling takes longer than the given timeout.""" - question_uuid, _, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: message}, - schema={}, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: message}, + schema={}, + ) with self.assertRaises(TimeoutError): event_handler.handle_events(timeout=0) def test_in_order_messages_are_handled_in_order(self): """Test that messages received in order are handled in order.""" - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) messages = [ - {"event": {"kind": "test"}, "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}}, - {"event": {"kind": "test"}, "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}}, - {"event": {"kind": "test"}, "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}}, - {"event": {"kind": "finish-test"}, "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}}, + { + "event": {"kind": "test", "order": 0}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, + }, + { + "event": {"kind": "test", "order": 1}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, + }, + { + "event": {"kind": "test", "order": 2}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, + }, + { + "event": {"kind": "finish-test", "order": 3}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, + }, ] for message in messages: - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) result = event_handler.handle_events() self.assertEqual(result, "This is the result.") self.assertEqual( event_handler.handled_events, - [{"kind": "test"}, {"kind": "test"}, {"kind": "test"}, {"kind": "finish-test"}], + [ + {"kind": "test", "order": 0}, + {"kind": "test", "order": 1}, + {"kind": "test", "order": 2}, + {"kind": "finish-test", "order": 3}, + ], ) def test_out_of_order_messages_are_handled_in_order(self): """Test that messages received out of order are handled in order.""" - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) messages = [ { "event": {"kind": "test", "order": 1}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 2}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 0}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "finish-test", "order": 3}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - mock_topic.messages_published = message["event"]["order"] - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) result = event_handler.handle_events() @@ -137,40 +163,42 @@ def test_out_of_order_messages_with_end_message_first_are_handled_in_order(self) """Test that messages received out of order and with the final message (the message that triggers a value to be returned) are handled in order. """ - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) messages = [ { "event": {"kind": "finish-test", "order": 3}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 1}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 2}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 0}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - mock_topic.messages_published = message["event"]["order"] - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) result = event_handler.handle_events() @@ -188,35 +216,38 @@ def test_out_of_order_messages_with_end_message_first_are_handled_in_order(self) def test_no_timeout(self): """Test that message handling works with no timeout.""" - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) messages = [ { "event": {"kind": "test", "order": 0}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 1}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "finish-test", "order": 2}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) result = event_handler.handle_events(timeout=None) @@ -228,36 +259,39 @@ def test_no_timeout(self): def test_delivery_acknowledgement(self): """Test that a delivery acknowledgement message is handled correctly.""" - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler(subscription=mock_subscription, receiving_service=parent) - + event_handler = GoogleCloudPubSubEventHandler(subscription=self.subscription, recipient=self.parent) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) messages = [ { - "event": {"kind": "delivery_acknowledgement", "datetime": datetime.datetime.utcnow().isoformat()}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "event": { + "kind": "delivery_acknowledgement", + "datetime": datetime.datetime.utcnow().isoformat(), + "order": 0, + }, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { - "event": {"kind": "result"}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "event": {"kind": "result", "order": 1}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) result = event_handler.handle_events() self.assertEqual(result, {"output_values": None, "output_manifest": None}) def test_error_raised_if_heartbeat_not_received_before_checked(self): """Test that an error is raised if a heartbeat isn't received before a heartbeat is first checked for.""" - question_uuid, _, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler(subscription=mock_subscription, receiving_service=parent) + event_handler = GoogleCloudPubSubEventHandler(subscription=self.subscription, recipient=self.parent) with self.assertRaises(TimeoutError) as error: event_handler.handle_events(maximum_heartbeat_interval=0) @@ -267,11 +301,7 @@ def test_error_raised_if_heartbeat_not_received_before_checked(self): def test_error_raised_if_heartbeats_stop_being_received(self): """Test that an error is raised if heartbeats stop being received within the maximum interval.""" - question_uuid, _, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler(subscription=mock_subscription, receiving_service=parent) - + event_handler = GoogleCloudPubSubEventHandler(subscription=self.subscription, recipient=self.parent) event_handler._last_heartbeat = datetime.datetime.now() - datetime.timedelta(seconds=30) with self.assertRaises(TimeoutError) as error: @@ -281,31 +311,33 @@ def test_error_raised_if_heartbeats_stop_being_received(self): def test_error_not_raised_if_heartbeat_has_been_received_in_maximum_allowed_interval(self): """Test that an error is not raised if a heartbeat has been received in the maximum allowed interval.""" - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler(subscription=mock_subscription, receiving_service=parent) - - event_handler._last_heartbeat = datetime.datetime.now() - + event_handler = GoogleCloudPubSubEventHandler(subscription=self.subscription, recipient=self.parent) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) + event_handler._last_heartbeat = datetime.datetime.now() messages = [ { "event": { "kind": "delivery_acknowledgement", "datetime": datetime.datetime.utcnow().isoformat(), + "order": 0, }, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { - "event": {"kind": "result"}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "event": {"kind": "result", "order": 1}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) with patch( "octue.cloud.pub_sub.events.GoogleCloudPubSubEventHandler._time_since_last_heartbeat", @@ -315,68 +347,63 @@ def test_error_not_raised_if_heartbeat_has_been_received_in_maximum_allowed_inte def test_time_since_last_heartbeat_is_none_if_no_heartbeat_received_yet(self): """Test that the time since the last heartbeat is `None` if no heartbeat has been received yet.""" - question_uuid, _, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler(subscription=mock_subscription, receiving_service=parent) - + event_handler = GoogleCloudPubSubEventHandler(subscription=self.subscription, recipient=self.parent) self.assertIsNone(event_handler._time_since_last_heartbeat) def test_total_run_time_is_none_if_handle_events_has_not_been_called(self): """Test that the total run time for the message handler is `None` if the `handle_events` method has not been called. """ - question_uuid, _, mock_subscription = create_mock_topic_and_subscription() - event_handler = GoogleCloudPubSubEventHandler(subscription=mock_subscription, receiving_service=parent) + event_handler = GoogleCloudPubSubEventHandler(subscription=self.subscription, recipient=self.parent) self.assertIsNone(event_handler.total_run_time) def test_time_since_missing_message_is_none_if_no_unhandled_missing_messages(self): """Test that the `time_since_missing_message` property is `None` if there are no unhandled missing messages.""" - question_uuid, _, mock_subscription = create_mock_topic_and_subscription() - event_handler = GoogleCloudPubSubEventHandler(subscription=mock_subscription, receiving_service=parent) + event_handler = GoogleCloudPubSubEventHandler(subscription=self.subscription, recipient=self.parent) self.assertIsNone(event_handler.time_since_missing_event) def test_missing_messages_at_start_can_be_skipped(self): """Test that missing messages at the start of the event stream can be skipped if they aren't received after a given time period if subsequent messages have been received. """ - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, - skip_missing_events_after=0, - ) - - # Simulate the first two messages not being received. - mock_topic.messages_published = 2 + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + skip_missing_events_after=0, + ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) + # Simulate the first two messages not being received. messages = [ { "event": {"kind": "test", "order": 2}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 3}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 4}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "finish-test", "order": 5}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) result = event_handler.handle_events() @@ -393,16 +420,13 @@ def test_missing_messages_at_start_can_be_skipped(self): def test_missing_messages_in_middle_can_skipped(self): """Test that missing messages in the middle of the event stream can be skipped.""" - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, - skip_missing_events_after=0, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + skip_missing_events_after=0, + ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) @@ -410,29 +434,35 @@ def test_missing_messages_in_middle_can_skipped(self): messages = [ { "event": {"kind": "test", "order": 0}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 1}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 2}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) - - # Simulate missing messages. - mock_topic.messages_published = 5 + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) # Send a final message. - child._send_message( - message={"kind": "finish-test", "order": 5}, - attributes={"question_uuid": question_uuid, "sender_type": "CHILD"}, - topic=mock_topic, + child._emit_event( + event={"kind": "finish-test", "order": 5}, + attributes={"question_uuid": self.question_uuid, "sender_type": "CHILD"}, + originator=self.parent.id, + recipient=self.parent.id, + # Simulate missing messages. + order=5, ) event_handler.handle_events() @@ -450,16 +480,13 @@ def test_missing_messages_in_middle_can_skipped(self): def test_multiple_blocks_of_missing_messages_in_middle_can_skipped(self): """Test that multiple blocks of missing messages in the middle of the event stream can be skipped.""" - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, - skip_missing_events_after=0, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + skip_missing_events_after=0, + ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) @@ -467,56 +494,66 @@ def test_multiple_blocks_of_missing_messages_in_middle_can_skipped(self): messages = [ { "event": {"kind": "test", "order": 0}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 1}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 2}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) - - # Simulate missing messages. - mock_topic.messages_published = 5 + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + order=message["event"]["order"], + ) # Send another message. - child._send_message( - message={"kind": "test", "order": 5}, - attributes={"message_number": 5, "question_uuid": question_uuid, "sender_type": "CHILD"}, - topic=mock_topic, + child._emit_event( + event={"kind": "test", "order": 5}, + attributes={"order": 5, "question_uuid": self.question_uuid, "sender_type": "CHILD"}, + originator=self.parent.id, + recipient=self.parent.id, + # Simulate missing messages. + order=5, ) - # Simulate more missing messages. - mock_topic.messages_published = 20 - # Send more consecutive messages. messages = [ { "event": {"kind": "test", "order": 20}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 21}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "test", "order": 22}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, { "event": {"kind": "finish-test", "order": 23}, - "attributes": {"question_uuid": question_uuid, "sender_type": "CHILD"}, + "attributes": {"question_uuid": self.question_uuid, "sender_type": "CHILD"}, }, ] for message in messages: - child._send_message(message=message["event"], attributes=message["attributes"], topic=mock_topic) + child._emit_event( + event=message["event"], + attributes=message["attributes"], + originator=self.parent.id, + recipient=self.parent.id, + # Simulate more missing messages. + order=message["event"]["order"], + ) event_handler.handle_events() @@ -537,27 +574,24 @@ def test_multiple_blocks_of_missing_messages_in_middle_can_skipped(self): def test_all_messages_missing_apart_from_result(self): """Test that the result message is still handled if all other messages are missing.""" - question_uuid, mock_topic, mock_subscription = create_mock_topic_and_subscription() - - with patch("octue.cloud.pub_sub.events.SubscriberClient", MockSubscriber): - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, - skip_missing_events_after=0, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + skip_missing_events_after=0, + ) child = MockService(backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME)) - # Simulate missing messages. - mock_topic.messages_published = 1000 - # Send the result message. - child._send_message( - message={"kind": "finish-test", "order": 1000}, - attributes={"question_uuid": question_uuid, "sender_type": "CHILD"}, - topic=mock_topic, + child._emit_event( + event={"kind": "finish-test", "order": 1000}, + attributes={"question_uuid": self.question_uuid, "sender_type": "CHILD"}, + originator=self.parent.id, + recipient=self.parent.id, + # Simulate missing messages. + order=1000, ) event_handler.handle_events() @@ -567,71 +601,94 @@ def test_all_messages_missing_apart_from_result(self): class TestPullAndEnqueueAvailableMessages(BaseTestCase): + service_patcher = ServicePatcher() + + @classmethod + def setUpClass(cls): + cls.service_patcher.start() + cls.question_uuid = str(uuid.uuid4()) + + cls.topic = MockTopic(name=OCTUE_SERVICES_PREFIX, project_name=TEST_PROJECT_NAME) + cls.topic.create(allow_existing=True) + + cls.subscription = MockSubscription( + name=f"my-org.my-service.1-0-0.answers.{cls.question_uuid}", + topic=cls.topic, + ) + cls.subscription.create() + + cls.parent = MockService( + service_id="my-org/my-service:1.0.0", + backend=GCPPubSubBackend(project_name=TEST_PROJECT_NAME), + ) + + @classmethod + def tearDownClass(cls): + """Stop the services patcher. + + :return None: + """ + cls.service_patcher.stop() + def test_pull_and_enqueue_available_events(self): """Test that pulling and enqueuing a message works.""" - question_uuid, mock_topic, _ = create_mock_topic_and_subscription() + self.subscription = MockSubscription( + name=f"my-org.my-service.1-0-0.answers.{self.question_uuid}", + topic=self.topic, + ) - with ServicePatcher(): - mock_subscription = MockSubscription( - name=f"my-org.my-service.1-0-0.answers.{question_uuid}", - topic=mock_topic, - ) + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + schema={}, + ) - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - schema={}, + event_handler.question_uuid = self.question_uuid + event_handler.child_sruid = "my-org/my-service:1.0.0" + event_handler._child_sdk_version = "0.1.3" + event_handler.waiting_events = {} + + # Enqueue a mock message for a mock subscription to receive. + mock_message = {"kind": "test"} + + MESSAGES[self.question_uuid] = [ + MockMessage.from_primitive( + mock_message, + attributes={ + "order": 0, + "question_uuid": self.question_uuid, + "originator": self.parent.id, + "sender": self.parent.id, + "sender_type": "CHILD", + "sender_sdk_version": "0.50.0", + "recipient": "my-org/my-service:1.0.0", + }, ) + ] - event_handler.question_uuid = question_uuid - event_handler.child_sruid = "my-org/my-service:1.0.0" - event_handler._child_sdk_version = "0.1.3" - event_handler.waiting_events = {} - - # Enqueue a mock message for a mock subscription to receive. - mock_message = {"kind": "test"} - - SUBSCRIPTIONS[mock_subscription.name] = [ - MockMessage.from_primitive( - mock_message, - attributes={ - "sender_type": "CHILD", - "message_number": 0, - "question_uuid": question_uuid, - "version": "0.50.0", - }, - ) - ] - - event_handler._pull_and_enqueue_available_events(timeout=10) - self.assertEqual(event_handler.waiting_events, {0: mock_message}) - self.assertEqual(event_handler._earliest_waiting_event_number, 0) + event_handler._pull_and_enqueue_available_events(timeout=10) + self.assertEqual(event_handler.waiting_events, {0: mock_message}) + self.assertEqual(event_handler._earliest_waiting_event_number, 0) def test_timeout_error_raised_if_result_message_not_received_in_time(self): """Test that a timeout error is raised if a result message is not received in time.""" - question_uuid, mock_topic, _ = create_mock_topic_and_subscription() - - with ServicePatcher(): - mock_subscription = MockSubscription( - name=f"my-org.my-service.1-0-0.answers.{question_uuid}", - topic=mock_topic, - ) - - event_handler = GoogleCloudPubSubEventHandler( - subscription=mock_subscription, - receiving_service=parent, - event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, - ) + self.subscription = MockSubscription( + name=f"my-org.my-service.1-0-0.answers.{self.question_uuid}", + topic=self.topic, + ) - event_handler._child_sdk_version = "0.1.3" - event_handler.waiting_events = {} - event_handler._start_time = 0 + event_handler = GoogleCloudPubSubEventHandler( + subscription=self.subscription, + recipient=self.parent, + event_handlers={"test": lambda message: None, "finish-test": lambda message: "This is the result."}, + ) - # Create a mock subscription. - SUBSCRIPTIONS[mock_subscription.name] = [] + event_handler._child_sdk_version = "0.1.3" + event_handler.waiting_events = {} + event_handler._start_time = 0 - with self.assertRaises(TimeoutError): - event_handler._pull_and_enqueue_available_events(timeout=1e-6) + with self.assertRaises(TimeoutError): + event_handler._pull_and_enqueue_available_events(timeout=1e-6) - self.assertEqual(event_handler._earliest_waiting_event_number, math.inf) + self.assertEqual(event_handler._earliest_waiting_event_number, math.inf) diff --git a/tests/cloud/pub_sub/test_logging.py b/tests/cloud/pub_sub/test_logging.py index 5b333cb3c..2f26af05d 100644 --- a/tests/cloud/pub_sub/test_logging.py +++ b/tests/cloud/pub_sub/test_logging.py @@ -3,9 +3,13 @@ from logging import makeLogRecord from unittest.mock import patch -from octue.cloud.emulators._pub_sub import SUBSCRIPTIONS, MockService, MockSubscription, MockTopic +from octue.cloud.emulators._pub_sub import MESSAGES, MockService, MockTopic +from octue.cloud.emulators.child import ServicePatcher +from octue.cloud.events import OCTUE_SERVICES_PREFIX +from octue.cloud.events.counter import EventCounter from octue.cloud.pub_sub.logging import GoogleCloudPubSubHandler from octue.resources.service_backends import GCPPubSubBackend +from tests import TEST_PROJECT_NAME from tests.base import BaseTestCase @@ -15,28 +19,42 @@ def __repr__(self): class TestGoogleCloudPubSubHandler(BaseTestCase): + service_patcher = ServicePatcher() + + @classmethod + def setUpClass(cls): + """Start the service patcher and create a mock services topic. + + :return None: + """ + cls.service_patcher.start() + topic = MockTopic(name=OCTUE_SERVICES_PREFIX, project_name=TEST_PROJECT_NAME) + topic.create(allow_existing=True) + + @classmethod + def tearDownClass(cls): + """Stop the services patcher. + + :return None: + """ + cls.service_patcher.stop() + def test_emit(self): """Test the log message is published when `GoogleCloudPubSubHandler.emit` is called.""" - topic = MockTopic(name="world", project_name="blah") - topic.create() - question_uuid = "96d69278-44ac-4631-aeea-c90fb08a1b2b" - subscription = MockSubscription(name=f"world.answers.{question_uuid}", topic=topic) - subscription.create() - log_record = makeLogRecord({"msg": "Starting analysis."}) - - backend = GCPPubSubBackend(project_name="blah") - service = MockService(backend=backend) + service = MockService(backend=GCPPubSubBackend(project_name="blah")) GoogleCloudPubSubHandler( - message_sender=service._send_message, - topic=topic, + event_emitter=service._emit_event, question_uuid=question_uuid, + originator="another/service:1.0.0", + recipient="another/service:1.0.0", + order=EventCounter(), ).emit(log_record) self.assertEqual( - json.loads(SUBSCRIPTIONS[subscription.name][0].data.decode())["log_record"]["msg"], + json.loads(MESSAGES[question_uuid][0].data.decode())["log_record"]["msg"], "Starting analysis.", ) @@ -44,9 +62,6 @@ def test_emit_with_non_json_serialisable_args(self): """Test that non-JSON-serialisable arguments to log messages are converted to their string representation before being serialised and published to the Pub/Sub topic. """ - topic = MockTopic(name="world-1", project_name="blah") - topic.create() - non_json_serialisable_thing = NonJSONSerialisable() # Check that it can't be serialised to JSON. @@ -57,14 +72,15 @@ def test_emit_with_non_json_serialisable_args(self): {"msg": "%r is not JSON-serialisable but can go into a log message", "args": (non_json_serialisable_thing,)} ) - backend = GCPPubSubBackend(project_name="blah") - service = MockService(backend=backend) + service = MockService(backend=GCPPubSubBackend(project_name="blah")) with patch("octue.cloud.emulators._pub_sub.MockPublisher.publish") as mock_publish: GoogleCloudPubSubHandler( - message_sender=service._send_message, - topic=topic, + event_emitter=service._emit_event, question_uuid="question-uuid", + originator="another/service:1.0.0", + recipient="another/service:1.0.0", + order=EventCounter(), ).emit(record) self.assertEqual( diff --git a/tests/cloud/pub_sub/test_service.py b/tests/cloud/pub_sub/test_service.py index 37adc3599..a03ce9ec4 100644 --- a/tests/cloud/pub_sub/test_service.py +++ b/tests/cloud/pub_sub/test_service.py @@ -15,13 +15,13 @@ DifferentMockAnalysis, MockAnalysis, MockAnalysisWithOutputManifest, - MockPullResponse, MockService, MockSubscription, MockTopic, ) from octue.cloud.emulators.child import ServicePatcher from octue.cloud.emulators.cloud_storage import mock_generate_signed_url +from octue.cloud.events import OCTUE_SERVICES_PREFIX from octue.cloud.pub_sub.service import Service from octue.exceptions import InvalidMonitorMessage from octue.resources import Analysis, Datafile, Dataset, Manifest @@ -43,6 +43,24 @@ class TestService(BaseTestCase): service_patcher = ServicePatcher() + @classmethod + def setUpClass(cls): + """Start the service patcher and create a mock services topic. + + :return None: + """ + cls.service_patcher.start() + topic = MockTopic(name=OCTUE_SERVICES_PREFIX, project_name=TEST_PROJECT_NAME) + topic.create(allow_existing=True) + + @classmethod + def tearDownClass(cls): + """Stop the services patcher. + + :return None: + """ + cls.service_patcher.stop() + def test_repr(self): """Test that services are represented as a string correctly.""" service = Service(backend=BACKEND) @@ -66,26 +84,21 @@ def test_service_id_cannot_be_non_none_empty_value(self): def test_serve_fails_if_service_with_same_id_already_exists(self): """Test that serving a service fails if a service with the same name already exists.""" - with self.service_patcher: - with patch( - "octue.cloud.pub_sub.service.Topic.create", - side_effect=google.api_core.exceptions.AlreadyExists(""), - ): - with self.assertRaises(exceptions.ServiceAlreadyExists): - MockService( - backend=BACKEND, - service_id=f"my-org/existing-service:{MOCK_SERVICE_REVISION_TAG}", - ).serve() + with patch( + "octue.cloud.pub_sub.service.Subscription.create", + side_effect=google.api_core.exceptions.AlreadyExists(""), + ): + with self.assertRaises(exceptions.ServiceAlreadyExists): + MockService(backend=BACKEND, service_id=f"my-org/existing-service:{MOCK_SERVICE_REVISION_TAG}").serve() def test_serve(self): """Test that serving works with a unique service ID. Test that the returned future has itself been returned and that the returned subscriber has been closed. """ - with self.service_patcher: - future, subscriber = MockService( - backend=BACKEND, - service_id=f"my-org/existing-service:{MOCK_SERVICE_REVISION_TAG}", - ).serve() + future, subscriber = MockService( + backend=BACKEND, + service_id=f"my-org/existing-service:{MOCK_SERVICE_REVISION_TAG}", + ).serve() self.assertFalse(future.cancelled) self.assertTrue(future.returned) @@ -95,87 +108,78 @@ def test_serve_detached(self): """Test that, when serving a service in detached mode, the returned future is not cancelled or returned and that the returned subscriber is not closed. """ - with self.service_patcher: - service = MockService(backend=BACKEND, service_id=f"my-org/existing-service-d:{MOCK_SERVICE_REVISION_TAG}") - future, subscriber = service.serve(detach=True) + service = MockService(backend=BACKEND, service_id=f"my-org/existing-service-d:{MOCK_SERVICE_REVISION_TAG}") + future, subscriber = service.serve(detach=True) self.assertFalse(future.cancelled) self.assertFalse(future.returned) self.assertFalse(subscriber.closed) - def test_ask_on_non_existent_service_results_in_error(self): - """Test that trying to ask a question to a non-existent service (i.e. one without a topic in Google Pub/Sub) - results in an error. - """ - with patch("octue.cloud.pub_sub.service.Topic", new=MockTopic): - service = MockService(backend=BACKEND) + def test_missing_services_topic_results_in_error(self): + """Test that an error is raised if the services topic doesn't exist.""" + service = MockService(backend=BACKEND) + with patch("octue.cloud.emulators._pub_sub.TOPICS", set()): with self.assertRaises(exceptions.ServiceNotFound): - service.ask( - service_id=f"my-org/existing-service:{MOCK_SERVICE_REVISION_TAG}", - input_values=[1, 2, 3, 4], - ) + service.services_topic def test_ask_unregistered_service_revision_when_service_registries_specified_results_in_error(self): """Test that an error is raised if attempting to ask an unregistered service a question when service registries are being used. """ - with patch("octue.cloud.pub_sub.service.Topic", new=MockTopic): - service = MockService( - backend=BACKEND, - service_registries=[{"name": "Octue Registry", "endpoint": "https://blah.com/services"}], - ) + service = MockService( + backend=BACKEND, + service_registries=[{"name": "Octue Registry", "endpoint": "https://blah.com/services"}], + ) - mock_response = requests.Response() - mock_response.status_code = 404 + mock_response = requests.Response() + mock_response.status_code = 404 - with patch("requests.get", return_value=mock_response): - with self.assertRaises(exceptions.ServiceNotFound): - service.ask( - service_id=f"my-org/unregistered-service:{MOCK_SERVICE_REVISION_TAG}", - input_values=[1, 2, 3, 4], - ) + with patch("requests.get", return_value=mock_response): + with self.assertRaises(exceptions.ServiceNotFound): + service.ask( + service_id=f"my-org/unregistered-service:{MOCK_SERVICE_REVISION_TAG}", + input_values=[1, 2, 3, 4], + ) def test_ask_unregistered_service_with_no_revision_tag_when_service_registries_specified_results_in_error(self): """Test that an error is raised when attempting to ask a question to an unregistered service without including revision tag when service registries are being used. """ - with patch("octue.cloud.pub_sub.service.Topic", new=MockTopic): - service = MockService( - backend=BACKEND, - service_registries=[{"name": "Octue Registry", "endpoint": "https://blah.com/services"}], - ) + service = MockService( + backend=BACKEND, + service_registries=[{"name": "Octue Registry", "endpoint": "https://blah.com/services"}], + ) - mock_response = requests.Response() - mock_response.status_code = 404 + mock_response = requests.Response() + mock_response.status_code = 404 - with patch("requests.get", return_value=mock_response): - with self.assertRaises(exceptions.ServiceNotFound): - service.ask(service_id="my-org/unregistered-service", input_values=[1, 2, 3, 4]) + with patch("requests.get", return_value=mock_response): + with self.assertRaises(exceptions.ServiceNotFound): + service.ask(service_id="my-org/unregistered-service", input_values=[1, 2, 3, 4]) def test_ask_service_with_no_revision_tag_when_service_registries_not_specified_results_in_error(self): """Test that an error is raised when attempting to ask a question to a service without including a revision tag when service registries are not being used. """ - with patch("octue.cloud.pub_sub.service.Topic", new=MockTopic): - service = MockService(backend=BACKEND) + service = MockService(backend=BACKEND) - mock_response = requests.Response() - mock_response.status_code = 404 + mock_response = requests.Response() + mock_response.status_code = 404 - with patch("requests.get", return_value=mock_response): - with self.assertRaises(exceptions.InvalidServiceID): - service.ask(service_id="my-org/unregistered-service", input_values=[1, 2, 3, 4]) + with patch("requests.get", return_value=mock_response): + with self.assertRaises(exceptions.InvalidServiceID): + service.ask(service_id="my-org/unregistered-service", input_values=[1, 2, 3, 4]) def test_timeout_error_raised_if_no_messages_received_when_waiting(self): """Test that a TimeoutError is raised if no messages are received while waiting.""" mock_topic = MockTopic(name="amazing.service.9-9-9", project_name=TEST_PROJECT_NAME) mock_subscription = MockSubscription(name="amazing.service.9-9-9", topic=mock_topic) + service = Service(backend=BACKEND) - with patch("octue.cloud.pub_sub.service.pubsub_v1.SubscriberClient.pull", return_value=MockPullResponse()): - with self.assertRaises(TimeoutError): - service.wait_for_answer(subscription=mock_subscription, timeout=0.01) + with self.assertRaises(TimeoutError): + service.wait_for_answer(subscription=mock_subscription, timeout=0.01) def test_error_raised_if_attempting_to_wait_for_answer_from_non_pull_subscription(self): """Test that an error is raised if attempting to wait for an answer from a push subscription.""" @@ -198,12 +202,11 @@ def test_exceptions_in_responder_are_handled_and_sent_to_asker(self): parent = MockService(backend=BACKEND, children={child.id: child}) - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(service_id=child.id, input_values={}) + child.serve() + subscription, _ = parent.ask(service_id=child.id, input_values={}) - with self.assertRaises(twined.exceptions.InvalidManifestContents) as context: - parent.wait_for_answer(subscription=subscription) + with self.assertRaises(twined.exceptions.InvalidManifestContents) as context: + parent.wait_for_answer(subscription=subscription) self.assertIn("'met_mast_id' is a required property", context.exception.args[0]) @@ -214,12 +217,11 @@ def test_exceptions_with_multiple_arguments_in_responder_are_handled_and_sent_to child = self.make_new_child_with_error(FileNotFoundError(2, "No such file or directory: 'blah'")) parent = MockService(backend=BACKEND, children={child.id: child}) - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(service_id=child.id, input_values={}) + child.serve() + subscription, _ = parent.ask(service_id=child.id, input_values={}) - with self.assertRaises(FileNotFoundError) as context: - parent.wait_for_answer(subscription) + with self.assertRaises(FileNotFoundError) as context: + parent.wait_for_answer(subscription) self.assertIn("[Errno 2] No such file or directory: 'blah'", format(context.exception)) @@ -232,12 +234,11 @@ class AnUnknownException(Exception): child = self.make_new_child_with_error(AnUnknownException("This is an exception unknown to the asker.")) parent = MockService(backend=BACKEND, children={child.id: child}) - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(service_id=child.id, input_values={}) + child.serve() + subscription, _ = parent.ask(service_id=child.id, input_values={}) - with self.assertRaises(Exception) as context: - parent.wait_for_answer(subscription) + with self.assertRaises(Exception) as context: + parent.wait_for_answer(subscription) self.assertEqual(type(context.exception).__name__, "AnUnknownException") self.assertIn("This is an exception unknown to the asker.", context.exception.args[0]) @@ -251,10 +252,9 @@ def test_ask_with_real_run_function_with_no_log_message_forwarding(self): parent = MockService(backend=BACKEND, children={child.id: child}) with self.assertLogs() as logging_context: - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=False) - answer = parent.wait_for_answer(subscription) + child.serve() + subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=False) + answer = parent.wait_for_answer(subscription) self.assertEqual( answer, @@ -277,10 +277,9 @@ def test_ask_with_real_run_function_with_log_message_forwarding(self): parent = MockService(backend=BACKEND, children={child.id: child}) with self.assertLogs() as logs_context_manager: - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=True) - answer = parent.wait_for_answer(subscription) + child.serve() + subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=True) + answer = parent.wait_for_answer(subscription) self.assertEqual( answer, @@ -323,10 +322,9 @@ def mock_app(analysis): parent = MockService(backend=BACKEND, children={child.id: child}) with self.assertLogs(level=logging.ERROR) as logs_context_manager: - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=True) - parent.wait_for_answer(subscription) + child.serve() + subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=True) + parent.wait_for_answer(subscription, timeout=100) error_logged = False @@ -365,12 +363,11 @@ def mock_app(analysis): child = MockService(backend=BACKEND, run_function=create_run_function_with_monitoring()) parent = MockService(backend=BACKEND, children={child.id: child}) - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(child.id, input_values={}) + child.serve() + subscription, _ = parent.ask(child.id, input_values={}) - monitoring_data = [] - parent.wait_for_answer(subscription, handle_monitor_message=lambda data: monitoring_data.append(data)) + monitoring_data = [] + parent.wait_for_answer(subscription, handle_monitor_message=lambda data: monitoring_data.append(data)) self.assertEqual( monitoring_data, @@ -404,16 +401,12 @@ def mock_app(analysis): child = MockService(backend=BACKEND, run_function=create_run_function_with_monitoring()) parent = MockService(backend=BACKEND, children={child.id: child}) - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(child.id, input_values={}) - monitoring_data = [] + child.serve() + subscription, _ = parent.ask(child.id, input_values={}) + monitoring_data = [] - with self.assertRaises(InvalidMonitorMessage): - parent.wait_for_answer( - subscription, - handle_monitor_message=lambda data: monitoring_data.append(data), - ) + with self.assertRaises(InvalidMonitorMessage): + parent.wait_for_answer(subscription, handle_monitor_message=lambda data: monitoring_data.append(data)) self.assertEqual(monitoring_data, [{"status": "my first monitor message"}]) @@ -421,36 +414,29 @@ def test_ask_with_non_json_python_primitive_input_values(self): """Test that non-JSON python primitive values (in this case a set and a datetime) can be sent and received by services. """ + input_values = {"my_set": {1, 2, 3}, "my_datetime": datetime.datetime.now()} def run_function(analysis_id, input_values, *args, **kwargs): return MockAnalysis(output_values=input_values) child = MockService(backend=BACKEND, run_function=lambda *args, **kwargs: run_function(*args, **kwargs)) parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - input_values = {"my_set": {1, 2, 3}, "my_datetime": datetime.datetime.now()} - - with self.service_patcher: - child.serve() - - subscription, _ = parent.ask( - service_id=child.id, - input_values=input_values, - subscribe_to_logs=True, - save_diagnostics="SAVE_DIAGNOSTICS_ON_CRASH", - ) - - answer = parent.wait_for_answer(subscription) + subscription, _ = parent.ask( + service_id=child.id, + input_values=input_values, + subscribe_to_logs=True, + save_diagnostics="SAVE_DIAGNOSTICS_ON_CRASH", + ) + answer = parent.wait_for_answer(subscription) self.assertEqual(answer["output_values"], input_values) def test_ask_with_input_manifest(self): """Test that a service can ask a question including an input manifest to another service that is serving and receive an answer. """ - child = self.make_new_child(BACKEND, run_function_returnee=MockAnalysis()) - parent = MockService(backend=BACKEND, children={child.id: child}) - dataset_path = f"gs://{TEST_BUCKET_NAME}/my-dataset" input_manifest = Manifest( @@ -462,12 +448,13 @@ def test_ask_with_input_manifest(self): } ) - with self.service_patcher: - child.serve() + child = self.make_new_child(BACKEND, run_function_returnee=MockAnalysis()) + parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - with patch("google.cloud.storage.blob.Blob.generate_signed_url", new=mock_generate_signed_url): - subscription, _ = parent.ask(service_id=child.id, input_values={}, input_manifest=input_manifest) - answer = parent.wait_for_answer(subscription) + with patch("google.cloud.storage.blob.Blob.generate_signed_url", new=mock_generate_signed_url): + subscription, _ = parent.ask(service_id=child.id, input_values={}, input_manifest=input_manifest) + answer = parent.wait_for_answer(subscription) self.assertEqual( answer, @@ -478,9 +465,6 @@ def test_ask_with_input_manifest_and_no_input_values(self): """Test that a service can ask a question including an input manifest and no input values to another service that is serving and receive an answer. """ - child = self.make_new_child(BACKEND, run_function_returnee=MockAnalysis()) - parent = MockService(backend=BACKEND, children={child.id: child}) - dataset_path = f"gs://{TEST_BUCKET_NAME}/my-dataset" input_manifest = Manifest( @@ -492,12 +476,13 @@ def test_ask_with_input_manifest_and_no_input_values(self): } ) - with self.service_patcher: - child.serve() + child = self.make_new_child(BACKEND, run_function_returnee=MockAnalysis()) + parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - with patch("google.cloud.storage.blob.Blob.generate_signed_url", new=mock_generate_signed_url): - subscription, _ = parent.ask(service_id=child.id, input_manifest=input_manifest) - answer = parent.wait_for_answer(subscription) + with patch("google.cloud.storage.blob.Blob.generate_signed_url", new=mock_generate_signed_url): + subscription, _ = parent.ask(service_id=child.id, input_manifest=input_manifest) + answer = parent.wait_for_answer(subscription) self.assertEqual( answer, @@ -538,18 +523,16 @@ def run_function(*args, **kwargs): child = MockService(backend=BACKEND, run_function=run_function) parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - with self.service_patcher: - child.serve() - - subscription, _ = parent.ask( - service_id=child.id, - input_values={}, - input_manifest=manifest, - allow_local_files=True, - ) + subscription, _ = parent.ask( + service_id=child.id, + input_values={}, + input_manifest=manifest, + allow_local_files=True, + ) - answer = parent.wait_for_answer(subscription) + answer = parent.wait_for_answer(subscription) self.assertEqual(answer["output_values"], "This is a local file.") @@ -557,11 +540,10 @@ def test_ask_with_output_manifest(self): """Test that a service can receive an output manifest as part of the answer to a question.""" child = self.make_new_child(BACKEND, run_function_returnee=MockAnalysisWithOutputManifest()) parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(service_id=child.id, input_values={}) - answer = parent.wait_for_answer(subscription) + subscription, _ = parent.ask(service_id=child.id, input_values={}) + answer = parent.wait_for_answer(subscription) self.assertEqual(answer["output_values"], MockAnalysisWithOutputManifest.output_values) self.assertEqual(answer["output_manifest"].id, MockAnalysisWithOutputManifest.output_manifest.id) @@ -570,14 +552,12 @@ def test_service_can_ask_multiple_questions_to_child(self): """Test that a service can ask multiple questions to the same child and expect replies to them all.""" child = self.make_new_child(BACKEND, run_function_returnee=MockAnalysis()) parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() + answers = [] - with self.service_patcher: - child.serve() - answers = [] - - for i in range(5): - subscription, _ = parent.ask(service_id=child.id, input_values={}) - answers.append(parent.wait_for_answer(subscription)) + for i in range(5): + subscription, _ = parent.ask(service_id=child.id, input_values={}) + answers.append(parent.wait_for_answer(subscription)) for answer in answers: self.assertEqual( @@ -589,18 +569,16 @@ def test_service_can_ask_questions_to_multiple_children(self): """Test that a service can ask questions to different children and expect replies to them all.""" child_1 = self.make_new_child(BACKEND, run_function_returnee=MockAnalysis()) child_2 = self.make_new_child(BACKEND, run_function_returnee=DifferentMockAnalysis()) - parent = MockService(backend=BACKEND, children={child_1.id: child_1, child_2.id: child_2}) - with self.service_patcher: - child_1.serve() - child_2.serve() + child_1.serve() + child_2.serve() - subscription, _ = parent.ask(service_id=child_1.id, input_values={}) - answer_1 = parent.wait_for_answer(subscription) + subscription, _ = parent.ask(service_id=child_1.id, input_values={}) + answer_1 = parent.wait_for_answer(subscription) - subscription, _ = parent.ask(service_id=child_2.id, input_values={}) - answer_2 = parent.wait_for_answer(subscription) + subscription, _ = parent.ask(service_id=child_2.id, input_values={}) + answer_2 = parent.wait_for_answer(subscription) self.assertEqual( answer_1, @@ -617,12 +595,13 @@ def test_service_can_ask_questions_to_multiple_children(self): def test_child_can_ask_its_own_child_questions(self): """Test that a child can contact its own child while answering a question from a parent.""" - child_of_child = self.make_new_child(BACKEND, run_function_returnee=DifferentMockAnalysis()) def child_run_function(analysis_id, input_values, *args, **kwargs): subscription, _ = child.ask(service_id=child_of_child.id, input_values=input_values) return MockAnalysis(output_values={input_values["question"]: child.wait_for_answer(subscription)}) + child_of_child = self.make_new_child(BACKEND, run_function_returnee=DifferentMockAnalysis()) + child = MockService( backend=BACKEND, run_function=child_run_function, @@ -631,16 +610,15 @@ def child_run_function(analysis_id, input_values, *args, **kwargs): parent = MockService(backend=BACKEND, children={child.id: child}) - with self.service_patcher: - child.serve() - child_of_child.serve() + child.serve() + child_of_child.serve() - subscription, _ = parent.ask( - service_id=child.id, - input_values={"question": "What does the child of the child say?"}, - ) + subscription, _ = parent.ask( + service_id=child.id, + input_values={"question": "What does the child of the child say?"}, + ) - answer = parent.wait_for_answer(subscription) + answer = parent.wait_for_answer(subscription) self.assertEqual( answer, @@ -657,8 +635,6 @@ def child_run_function(analysis_id, input_values, *args, **kwargs): def test_child_can_ask_its_own_children_questions(self): """Test that a child can contact more than one of its own children while answering a question from a parent.""" - first_child_of_child = self.make_new_child(BACKEND, run_function_returnee=DifferentMockAnalysis()) - second_child_of_child = self.make_new_child(BACKEND, run_function_returnee=MockAnalysis()) def child_run_function(analysis_id, input_values, *args, **kwargs): subscription_1, _ = child.ask(service_id=first_child_of_child.id, input_values=input_values) @@ -671,25 +647,30 @@ def child_run_function(analysis_id, input_values, *args, **kwargs): } ) + first_child_of_child = self.make_new_child(BACKEND, run_function_returnee=DifferentMockAnalysis()) + second_child_of_child = self.make_new_child(BACKEND, run_function_returnee=MockAnalysis()) + child = MockService( backend=BACKEND, run_function=child_run_function, - children={first_child_of_child.id: first_child_of_child, second_child_of_child.id: second_child_of_child}, + children={ + first_child_of_child.id: first_child_of_child, + second_child_of_child.id: second_child_of_child, + }, ) parent = MockService(backend=BACKEND, children={child.id: child}) - with self.service_patcher: - child.serve() - first_child_of_child.serve() - second_child_of_child.serve() + child.serve() + first_child_of_child.serve() + second_child_of_child.serve() - subscription, _ = parent.ask( - service_id=child.id, - input_values={"question": "What does the child of the child say?"}, - ) + subscription, _ = parent.ask( + service_id=child.id, + input_values={"question": "What does the child of the child say?"}, + ) - answer = parent.wait_for_answer(subscription) + answer = parent.wait_for_answer(subscription) self.assertEqual( answer, @@ -712,11 +693,10 @@ def test_child_messages_can_be_recorded_by_parent(self): """Test that the parent can record messages it receives from its child to a JSON file.""" child = MockService(backend=BACKEND, run_function=self.create_run_function()) parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - with self.service_patcher: - child.serve() - subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=True) - parent.wait_for_answer(subscription) + subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=True) + parent.wait_for_answer(subscription) # Check that the child's messages have been recorded by the parent. self.assertEqual(parent.received_messages[0]["kind"], "delivery_acknowledgement") @@ -729,13 +709,11 @@ def test_child_exception_message_can_be_recorded_by_parent(self): """Test that the parent can record exceptions raised by the child.""" child = self.make_new_child_with_error(ValueError("Oh no.")) parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - with self.service_patcher: - child.serve() - - with self.assertRaises(ValueError): - subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=True) - parent.wait_for_answer(subscription) + with self.assertRaises(ValueError): + subscription, _ = parent.ask(service_id=child.id, input_values={}, subscribe_to_logs=True) + parent.wait_for_answer(subscription) # Check that the child's messages have been recorded by the parent. self.assertEqual(parent.received_messages[0]["kind"], "delivery_acknowledgement") @@ -752,22 +730,20 @@ def run_function(*args, **kwargs): child = MockService(backend=BACKEND, run_function=lambda *args, **kwargs: run_function()) parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - with self.service_patcher: - child.serve() - - with patch( - "octue.cloud.emulators._pub_sub.MockService.answer", - functools.partial(child.answer, heartbeat_interval=expected_interval), - ): - subscription, _ = parent.ask( - service_id=child.id, - input_values={}, - subscribe_to_logs=True, - save_diagnostics="SAVE_DIAGNOSTICS_ON_CRASH", - ) + with patch( + "octue.cloud.emulators._pub_sub.MockService.answer", + functools.partial(child.answer, heartbeat_interval=expected_interval), + ): + subscription, _ = parent.ask( + service_id=child.id, + input_values={}, + subscribe_to_logs=True, + save_diagnostics="SAVE_DIAGNOSTICS_ON_CRASH", + ) - parent.wait_for_answer(subscription) + parent.wait_for_answer(subscription) self.assertEqual(parent.received_messages[1]["kind"], "heartbeat") self.assertEqual(parent.received_messages[2]["kind"], "heartbeat") @@ -799,19 +775,17 @@ def run_function(*args, **kwargs): child = MockService(backend=BACKEND, run_function=run_function) parent = MockService(backend=BACKEND, children={child.id: child}) + child.serve() - with self.service_patcher: - child.serve() - - subscription, _ = parent.ask( - service_id=child.id, - input_values={}, - subscribe_to_logs=True, - save_diagnostics="SAVE_DIAGNOSTICS_ON_CRASH", - ) + subscription, _ = parent.ask( + service_id=child.id, + input_values={}, + subscribe_to_logs=True, + save_diagnostics="SAVE_DIAGNOSTICS_ON_CRASH", + ) - monitor_messages = [] - result = parent.wait_for_answer(subscription, handle_monitor_message=monitor_messages.append) + monitor_messages = [] + result = parent.wait_for_answer(subscription, handle_monitor_message=monitor_messages.append) # Check that multiple monitor messages were sent and received. self.assertTrue(len(monitor_messages) > 1) @@ -879,6 +853,10 @@ def mock_child_app(analysis): children={child.id: child}, ) + static_child_of_child.serve() + dynamic_child_of_child.serve() + child.serve() + dynamic_children = [ { "key": "expected_child", @@ -887,13 +865,8 @@ def mock_child_app(analysis): }, ] - with self.service_patcher: - static_child_of_child.serve() - dynamic_child_of_child.serve() - child.serve() - - subscription, _ = parent.ask(service_id=child.id, input_values={}, children=dynamic_children) - answer = parent.wait_for_answer(subscription) + subscription, _ = parent.ask(service_id=child.id, input_values={}, children=dynamic_children) + answer = parent.wait_for_answer(subscription) self.assertEqual(answer["output_values"], "I am the dynamic child.") diff --git a/tests/cloud/pub_sub/test_subscription.py b/tests/cloud/pub_sub/test_subscription.py index 903247a15..bbf4961c8 100644 --- a/tests/cloud/pub_sub/test_subscription.py +++ b/tests/cloud/pub_sub/test_subscription.py @@ -17,13 +17,7 @@ class TestSubscription(BaseTestCase): def test_repr(self): """Test that subscriptions are represented correctly.""" - self.assertEqual(repr(self.subscription), "") - - def test_namespace_only_in_name_once(self): - """Test that the subscription's namespace only appears in its name once, even if it is repeated.""" - self.assertEqual(self.subscription.name, "octue.services.world") - subscription_with_repeated_namespace = Subscription(name="octue.services.world", topic=self.topic) - self.assertEqual(subscription_with_repeated_namespace.name, "octue.services.world") + self.assertEqual(repr(self.subscription), "") def test_create_without_allow_existing_when_subscription_already_exists(self): """Test that an error is raised when trying to create a subscription that already exists and `allow_existing` is diff --git a/tests/cloud/pub_sub/test_topic.py b/tests/cloud/pub_sub/test_topic.py index 03fb05df6..5267226da 100644 --- a/tests/cloud/pub_sub/test_topic.py +++ b/tests/cloud/pub_sub/test_topic.py @@ -10,15 +10,7 @@ class TestTopic(BaseTestCase): def test_repr(self): """Test that Topics are represented correctly.""" topic = Topic(name="world", project_name="my-project") - self.assertEqual(repr(topic), "") - - def test_namespace_only_in_name_once(self): - """Test that the topic's namespace only appears in its name once, even if it is repeated.""" - topic = Topic(name="world", project_name="my-project") - self.assertEqual(topic.name, "octue.services.world") - - topic_with_repeated_namespace = Topic(name="octue.services.world", project_name="my-project") - self.assertEqual(topic_with_repeated_namespace.name, "octue.services.world") + self.assertEqual(repr(topic), "") def test_create(self): """Test that a topic can be created and that it's marked as having its creation triggered locally.""" diff --git a/tests/cloud/test_service_id.py b/tests/cloud/test_service_id.py index b820e35ac..606e76dfa 100644 --- a/tests/cloud/test_service_id.py +++ b/tests/cloud/test_service_id.py @@ -10,6 +10,7 @@ from octue.cloud.service_id import ( convert_service_id_to_pub_sub_form, get_default_sruid, + get_sruid_from_pub_sub_resource_name, get_sruid_parts, raise_if_revision_not_registered, split_service_id, @@ -83,7 +84,6 @@ def test_convert_service_id_to_pub_sub_form(self): ("octue/my-service", "octue.my-service"), ("octue/my-service:0.1.7", "octue.my-service.0-1-7"), ("my-service:3.1.9", "my-service.3-1-9"), - ("octue.services.octue/my-service:0.1.7", "octue.services.octue.my-service.0-1-7"), ) for service_id, pub_sub_service_id in service_ids: @@ -91,6 +91,13 @@ def test_convert_service_id_to_pub_sub_form(self): self.assertEqual(convert_service_id_to_pub_sub_form(service_id), pub_sub_service_id) +class TestGetSRUIDFromPubSubResourceName(unittest.TestCase): + def test_get_sruid_from_pub_sub_resource_name(self): + """Test that an SRUID can be extracted from a Pub/Sub resource name.""" + sruid = get_sruid_from_pub_sub_resource_name("octue.example-service-cloud-run.0-3-2") + self.assertEqual(sruid, "octue/example-service-cloud-run:0.3.2") + + class TestValidateSRUID(unittest.TestCase): def test_error_raised_if_service_id_invalid(self): """Test that an error is raised if an invalid SRUID is given.""" diff --git a/tests/data/events.json b/tests/data/events.json index d1e9e1b2a..4cd741a59 100644 --- a/tests/data/events.json +++ b/tests/data/events.json @@ -5,11 +5,13 @@ "kind": "delivery_acknowledgement" }, "attributes": { - "message_number": "0", + "order": "0", "question_uuid": "d45c7e99-d610-413b-8130-dd6eef46dda6", + "originator": "octue/test-service:1.0.0", + "sender": "octue/test-service:1.0.0", "sender_type": "CHILD", - "version": "0.51.0", - "sender": "octue/test-service:1.0.0" + "sender_sdk_version": "0.51.0", + "recipient": "octue/another-service:3.2.1" } }, { @@ -39,11 +41,13 @@ } }, "attributes": { - "message_number": "1", + "order": "1", "question_uuid": "d45c7e99-d610-413b-8130-dd6eef46dda6", + "originator": "octue/test-service:1.0.0", + "sender": "octue/test-service:1.0.0", "sender_type": "CHILD", - "version": "0.51.0", - "sender": "octue/test-service:1.0.0" + "sender_sdk_version": "0.51.0", + "recipient": "octue/another-service:3.2.1" } }, { @@ -73,11 +77,13 @@ } }, "attributes": { - "message_number": "2", + "order": "2", "question_uuid": "d45c7e99-d610-413b-8130-dd6eef46dda6", + "originator": "octue/test-service:1.0.0", + "sender": "octue/test-service:1.0.0", "sender_type": "CHILD", - "version": "0.51.0", - "sender": "octue/test-service:1.0.0" + "sender_sdk_version": "0.51.0", + "recipient": "octue/another-service:3.2.1" } }, { @@ -107,11 +113,13 @@ } }, "attributes": { - "message_number": "3", + "order": "3", "question_uuid": "d45c7e99-d610-413b-8130-dd6eef46dda6", + "originator": "octue/test-service:1.0.0", + "sender": "octue/test-service:1.0.0", "sender_type": "CHILD", - "version": "0.51.0", - "sender": "octue/test-service:1.0.0" + "sender_sdk_version": "0.51.0", + "recipient": "octue/another-service:3.2.1" } }, { @@ -141,11 +149,13 @@ } }, "attributes": { - "message_number": "4", + "order": "4", "question_uuid": "d45c7e99-d610-413b-8130-dd6eef46dda6", + "originator": "octue/test-service:1.0.0", + "sender": "octue/test-service:1.0.0", "sender_type": "CHILD", - "version": "0.51.0", - "sender": "octue/test-service:1.0.0" + "sender_sdk_version": "0.51.0", + "recipient": "octue/another-service:3.2.1" } }, { @@ -175,11 +185,13 @@ } }, "attributes": { - "message_number": "5", + "order": "5", "question_uuid": "d45c7e99-d610-413b-8130-dd6eef46dda6", + "originator": "octue/test-service:1.0.0", + "sender": "octue/test-service:1.0.0", "sender_type": "CHILD", - "version": "0.51.0", - "sender": "octue/test-service:1.0.0" + "sender_sdk_version": "0.51.0", + "recipient": "octue/another-service:3.2.1" } }, { @@ -204,11 +216,13 @@ "output_values": [1, 2, 3, 4, 5] }, "attributes": { - "message_number": "6", + "order": "6", "question_uuid": "d45c7e99-d610-413b-8130-dd6eef46dda6", + "originator": "octue/test-service:1.0.0", + "sender": "octue/test-service:1.0.0", "sender_type": "CHILD", - "version": "0.51.0", - "sender": "octue/test-service:1.0.0" + "sender_sdk_version": "0.51.0", + "recipient": "octue/another-service:3.2.1" } }, { @@ -217,11 +231,13 @@ "kind": "heartbeat" }, "attributes": { - "message_number": "7", + "order": "7", "question_uuid": "d45c7e99-d610-413b-8130-dd6eef46dda6", + "originator": "octue/test-service:1.0.0", + "sender": "octue/test-service:1.0.0", "sender_type": "CHILD", - "version": "0.51.0", - "sender": "octue/test-service:1.0.0" + "sender_sdk_version": "0.51.0", + "recipient": "octue/another-service:3.2.1" } } ] diff --git a/tests/resources/test_child.py b/tests/resources/test_child.py index 1e9c98234..502dcc940 100644 --- a/tests/resources/test_child.py +++ b/tests/resources/test_child.py @@ -8,11 +8,12 @@ from google.auth.exceptions import DefaultCredentialsError -from octue.cloud.emulators._pub_sub import MockAnalysis, MockService, MockSubscriber, MockSubscription, MockTopic +from octue.cloud.emulators._pub_sub import MockAnalysis, MockService, MockTopic from octue.cloud.emulators.child import ServicePatcher +from octue.cloud.events import OCTUE_SERVICES_PREFIX from octue.resources.child import Child from octue.resources.service_backends import GCPPubSubBackend -from tests import MOCK_SERVICE_REVISION_TAG +from tests import MOCK_SERVICE_REVISION_TAG, TEST_PROJECT_NAME from tests.base import BaseTestCase @@ -32,6 +33,26 @@ def mock_run_function_that_fails_every_other_time(analysis_id, input_values, *ar class TestChild(BaseTestCase): + service_patcher = ServicePatcher() + + @classmethod + def setUpClass(cls): + """Start the service patcher and create a mock services topic. + + :return None: + """ + cls.service_patcher.start() + topic = MockTopic(name=OCTUE_SERVICES_PREFIX, project_name=TEST_PROJECT_NAME) + topic.create(allow_existing=True) + + @classmethod + def tearDownClass(cls): + """Stop the services patcher. + + :return None: + """ + cls.service_patcher.stop() + def test_representation(self): """Test that children are represented correctly as a string.""" self.assertEqual( @@ -55,18 +76,14 @@ def test_instantiating_child_without_credentials(self): def test_child_cannot_be_asked_question_without_credentials(self): """Test that a child cannot be asked a question without Google Cloud credentials being available.""" with patch.dict(os.environ, clear=True): - with patch("octue.cloud.pub_sub.service.Topic", new=MockTopic): - with patch("octue.cloud.pub_sub.service.Subscription", new=MockSubscription): - with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): - with patch("google.cloud.pubsub_v1.SubscriberClient", new=MockSubscriber): - - child = Child( - id=f"octue/my-child:{MOCK_SERVICE_REVISION_TAG}", - backend={"name": "GCPPubSubBackend", "project_name": "blah"}, - ) + with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): + child = Child( + id=f"octue/my-child:{MOCK_SERVICE_REVISION_TAG}", + backend={"name": "GCPPubSubBackend", "project_name": "blah"}, + ) - with self.assertRaises(DefaultCredentialsError): - child.ask({"some": "input"}) + with self.assertRaises(DefaultCredentialsError): + child.ask({"some": "input"}) def test_child_can_be_asked_multiple_questions(self): """Test that a child can be asked multiple questions.""" @@ -76,16 +93,14 @@ def mock_run_function(analysis_id, input_values, *args, **kwargs): responding_service = MockService(backend=GCPPubSubBackend(project_name="blah"), run_function=mock_run_function) - with ServicePatcher(): - with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): - responding_service.serve() - - child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) + with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): + responding_service.serve() + child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) - # Make sure the child's underlying mock service knows how to access the mock responding service. - child._service.children[responding_service.id] = responding_service - self.assertEqual(child.ask([1, 2, 3, 4])["output_values"], [1, 2, 3, 4]) - self.assertEqual(child.ask([5, 6, 7, 8])["output_values"], [5, 6, 7, 8]) + # Make sure the child's underlying mock service knows how to access the mock responding service. + child._service.children[responding_service.id] = responding_service + self.assertEqual(child.ask([1, 2, 3, 4])["output_values"], [1, 2, 3, 4]) + self.assertEqual(child.ask([5, 6, 7, 8])["output_values"], [5, 6, 7, 8]) def test_ask_multiple(self): """Test that a child can be asked multiple questions in parallel and return the answers in the correct order.""" @@ -96,27 +111,26 @@ def mock_run_function(analysis_id, input_values, *args, **kwargs): responding_service = MockService(backend=GCPPubSubBackend(project_name="blah"), run_function=mock_run_function) - with ServicePatcher(): - with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): - responding_service.serve() + with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): + responding_service.serve() - child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) + child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) - # Make sure the child's underlying mock service knows how to access the mock responding service. - child._service.children[responding_service.id] = responding_service + # Make sure the child's underlying mock service knows how to access the mock responding service. + child._service.children[responding_service.id] = responding_service - answers = child.ask_multiple( - {"input_values": [1, 2, 3, 4]}, - {"input_values": [5, 6, 7, 8]}, - ) + answers = child.ask_multiple( + {"input_values": [1, 2, 3, 4]}, + {"input_values": [5, 6, 7, 8]}, + ) - self.assertEqual( - answers, - [ - {"output_values": [1, 2, 3, 4], "output_manifest": None}, - {"output_values": [5, 6, 7, 8], "output_manifest": None}, - ], - ) + self.assertEqual( + answers, + [ + {"output_values": [1, 2, 3, 4], "output_manifest": None}, + {"output_values": [5, 6, 7, 8], "output_manifest": None}, + ], + ) def test_error_raised_in_ask_multiple_if_one_question_fails_when_raise_errors_is_true(self): """Test that an error is raised if any of the questions given to `Child.ask_multiple` fail when `raise_errors` @@ -137,21 +151,20 @@ def mock_run_function_that_sometimes_fails(analysis_id, input_values, *args, **k run_function=functools.partial(mock_run_function_that_sometimes_fails, runs=Value("d", 0)), ) - with ServicePatcher(): - with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): - responding_service.serve() + with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): + responding_service.serve() - child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) + child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) - # Make sure the child's underlying mock service knows how to access the mock responding service. - child._service.children[responding_service.id] = responding_service + # Make sure the child's underlying mock service knows how to access the mock responding service. + child._service.children[responding_service.id] = responding_service - with self.assertRaises(ValueError): - child.ask_multiple( - {"input_values": [1, 2, 3, 4]}, - {"input_values": [5, 6, 7, 8]}, - {"input_values": [9, 10, 11, 12]}, - ) + with self.assertRaises(ValueError): + child.ask_multiple( + {"input_values": [1, 2, 3, 4]}, + {"input_values": [5, 6, 7, 8]}, + {"input_values": [9, 10, 11, 12]}, + ) def test_error_not_raised_by_ask_multiple_if_one_question_fails_when_raise_errors_is_false(self): """Test that an error is not raised if one of the questions given to `Child.ask_multiple` fail when @@ -162,21 +175,20 @@ def test_error_not_raised_by_ask_multiple_if_one_question_fails_when_raise_error run_function=functools.partial(mock_run_function_that_fails_every_other_time, runs=Value("d", 0)), ) - with ServicePatcher(): - with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): - responding_service.serve() + with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): + responding_service.serve() - child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) + child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) - # Make sure the child's underlying mock service knows how to access the mock responding service. - child._service.children[responding_service.id] = responding_service + # Make sure the child's underlying mock service knows how to access the mock responding service. + child._service.children[responding_service.id] = responding_service - answers = child.ask_multiple( - {"input_values": [1, 2, 3, 4]}, - {"input_values": [5, 6, 7, 8]}, - {"input_values": [9, 10, 11, 12]}, - raise_errors=False, - ) + answers = child.ask_multiple( + {"input_values": [1, 2, 3, 4]}, + {"input_values": [5, 6, 7, 8]}, + {"input_values": [9, 10, 11, 12]}, + raise_errors=False, + ) successful_answers = [] failed_answers = [] @@ -204,22 +216,21 @@ def test_ask_multiple_with_failed_question_retry(self): run_function=functools.partial(mock_run_function_that_fails_every_other_time, runs=Value("d", 0)), ) - with ServicePatcher(): - with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): - responding_service.serve() + with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): + responding_service.serve() - child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) + child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) - # Make sure the child's underlying mock service knows how to access the mock responding service. - child._service.children[responding_service.id] = responding_service + # Make sure the child's underlying mock service knows how to access the mock responding service. + child._service.children[responding_service.id] = responding_service - # Only ask two questions so the question success/failure order plays out as desired. - answers = child.ask_multiple( - {"input_values": [1, 2, 3, 4]}, - {"input_values": [5, 6, 7, 8]}, - raise_errors=False, - max_retries=1, - ) + # Only ask two questions so the question success/failure order plays out as desired. + answers = child.ask_multiple( + {"input_values": [1, 2, 3, 4]}, + {"input_values": [5, 6, 7, 8]}, + raise_errors=False, + max_retries=1, + ) # Check that both questions succeeded. self.assertEqual( @@ -247,24 +258,23 @@ def test_ask_multiple_with_multiple_failed_question_retries(self): run_function=functools.partial(mock_run_function_that_fails_every_other_time, runs=Value("d", 0)), ) - with ServicePatcher(): - with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): - responding_service.serve() + with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): + responding_service.serve() - child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) + child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) - # Make sure the child's underlying mock service knows how to access the mock responding service. - child._service.children[responding_service.id] = responding_service + # Make sure the child's underlying mock service knows how to access the mock responding service. + child._service.children[responding_service.id] = responding_service - # Only ask two questions so the question success/failure order plays out as desired. - answers = child.ask_multiple( - {"input_values": [1, 2, 3, 4]}, - {"input_values": [5, 6, 7, 8]}, - {"input_values": [9, 10, 11, 12]}, - {"input_values": [13, 14, 15, 16]}, - raise_errors=False, - max_retries=2, - ) + # Only ask two questions so the question success/failure order plays out as desired. + answers = child.ask_multiple( + {"input_values": [1, 2, 3, 4]}, + {"input_values": [5, 6, 7, 8]}, + {"input_values": [9, 10, 11, 12]}, + {"input_values": [13, 14, 15, 16]}, + raise_errors=False, + max_retries=2, + ) # Check that all four questions succeeded. self.assertEqual( @@ -284,23 +294,22 @@ def test_ask_multiple_with_prevented_retries(self): run_function=functools.partial(mock_run_function_that_fails_every_other_time, runs=Value("d", 0)), ) - with ServicePatcher(): - with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): - responding_service.serve() + with patch("octue.resources.child.BACKEND_TO_SERVICE_MAPPING", {"GCPPubSubBackend": MockService}): + responding_service.serve() - child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) + child = Child(id=responding_service.id, backend={"name": "GCPPubSubBackend", "project_name": "blah"}) - # Make sure the child's underlying mock service knows how to access the mock responding service. - child._service.children[responding_service.id] = responding_service + # Make sure the child's underlying mock service knows how to access the mock responding service. + child._service.children[responding_service.id] = responding_service - # Only ask two questions so the question success/failure order plays out as desired. - answers = child.ask_multiple( - {"input_values": [1, 2, 3, 4]}, - {"input_values": [5, 6, 7, 8]}, - raise_errors=False, - max_retries=1, - prevent_retries_when=[ValueError], - ) + # Only ask two questions so the question success/failure order plays out as desired. + answers = child.ask_multiple( + {"input_values": [1, 2, 3, 4]}, + {"input_values": [5, 6, 7, 8]}, + raise_errors=False, + max_retries=1, + prevent_retries_when=[ValueError], + ) successful_answers = [] failed_answers = [] diff --git a/tests/resources/test_service_backends.py b/tests/resources/test_service_backends.py index e0cabb357..f385ae3c1 100644 --- a/tests/resources/test_service_backends.py +++ b/tests/resources/test_service_backends.py @@ -19,6 +19,6 @@ def test_repr(self): self.assertEqual(repr(GCPPubSubBackend(project_name="hello")), "") def test_error_raised_if_project_name_is_none(self): - """Test that an error is raised if the project name is not given during `GCPPubSubBackend` instantiation.""" + """Test that an error is raised if the project name isn't given during `GCPPubSubBackend` instantiation.""" with self.assertRaises(CloudLocationNotSpecified): GCPPubSubBackend(project_name=None) diff --git a/tests/test_cli.py b/tests/test_cli.py index 7ab87ae06..a2e661d93 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -11,10 +11,11 @@ from octue.cloud import storage from octue.cloud.emulators._pub_sub import MockService, MockTopic from octue.cloud.emulators.child import ServicePatcher +from octue.cloud.events import OCTUE_SERVICES_PREFIX from octue.configuration import AppConfiguration, ServiceConfiguration from octue.resources import Dataset from octue.utils.patches import MultiPatcher -from tests import MOCK_SERVICE_REVISION_TAG, TEST_BUCKET_NAME, TESTS_DIR +from tests import MOCK_SERVICE_REVISION_TAG, TEST_BUCKET_NAME, TEST_PROJECT_NAME, TESTS_DIR from tests.base import BaseTestCase @@ -194,6 +195,11 @@ def setUpClass(cls): }, ) + topic = MockTopic(name=OCTUE_SERVICES_PREFIX, project_name=TEST_PROJECT_NAME) + + with ServicePatcher(): + topic.create(allow_existing=True) + def test_start_command(self): """Test that the start command works without error and uses the revision tag supplied in the `OCTUE_SERVICE_REVISION_TAG` environment variable. @@ -423,57 +429,3 @@ def test_create_push_subscription(self): self.assertEqual(subscription.call_args.kwargs["name"], "octue.example-service.3-5-0") self.assertEqual(subscription.call_args.kwargs["push_endpoint"], "https://example.com/endpoint") self.assertEqual(subscription.call_args.kwargs["expiration_time"], expected_expiration_time) - - def test_create_push_subscription_with_filter(self): - """Test that filters are added to subscriptions correctly when creating a push subscription.""" - for filter_option, expected_filter in ( - ([], 'attributes.sender_type = "PARENT"'), - (["--filter="], None), - (['--filter=attributes.sender_type = "CHILD"'], 'attributes.sender_type = "CHILD"'), - ): - with self.subTest(filter_option=filter_option): - with patch("octue.cloud.pub_sub.Topic", new=MockTopic): - with patch("octue.cloud.pub_sub.Subscription") as subscription: - result = CliRunner().invoke( - octue_cli, - [ - "deploy", - "create-push-subscription", - "my-project", - "octue", - "example-service", - "https://example.com/endpoint", - "--revision-tag=3.5.0", - *filter_option, - ], - ) - - self.assertIsNone(result.exception) - self.assertEqual(result.exit_code, 0) - self.assertEqual(subscription.call_args.kwargs["name"], "octue.example-service.3-5-0") - self.assertEqual(subscription.call_args.kwargs["push_endpoint"], "https://example.com/endpoint") - self.assertEqual(subscription.call_args.kwargs["filter"], expected_filter) - - def test_create_push_subscription_with_subscription_suffix(self): - """Test that subscription suffixes are added to subscription names correctly when creating a push subscription.""" - with patch("octue.cloud.pub_sub.Topic", new=MockTopic): - with patch("octue.cloud.pub_sub.Subscription") as subscription: - result = CliRunner().invoke( - octue_cli, - [ - "deploy", - "create-push-subscription", - "my-project", - "octue", - "example-service", - "https://example.com/endpoint", - "--revision-tag=3.5.0", - "--subscription-suffix=-peter-rabbit", - ], - ) - - self.assertIsNone(result.exception) - self.assertEqual(result.exit_code, 0) - self.assertEqual(subscription.call_args.kwargs["topic"].name, "octue.services.octue.example-service.3-5-0") - self.assertEqual(subscription.call_args.kwargs["name"], "octue.example-service.3-5-0-peter-rabbit") - self.assertEqual(subscription.call_args.kwargs["push_endpoint"], "https://example.com/endpoint")