Skip to content

Commit bc4224c

Browse files
authored
MRG: Merge pull request #639 from octue/use-single-topic-per-workspace
Use single topic per workspace
2 parents 811cc94 + a54c04b commit bc4224c

35 files changed

+1054
-958
lines changed

octue/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
__all__ = ("Runner",)
88

9+
910
REPOSITORY_ROOT = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
1011

1112

octue/cli.py

+3-23
Original file line numberDiff line numberDiff line change
@@ -357,35 +357,16 @@ def deploy():
357357
help="The service revision tag (e.g. 1.0.7). If this option isn't given, a random 'cool name' tag is generated e.g"
358358
". 'curious-capybara'.",
359359
)
360-
@click.option(
361-
"--filter",
362-
is_flag=False,
363-
default='attributes.sender_type = "PARENT"',
364-
show_default=True,
365-
help="An optional filter to apply to the subscription (see "
366-
"https://cloud.google.com/pubsub/docs/subscription-message-filter). If not provided, the default filter is applied."
367-
" To disable filtering, provide an empty string.",
368-
)
369-
@click.option(
370-
"--subscription-suffix",
371-
is_flag=False,
372-
default=None,
373-
show_default=True,
374-
help="An optional suffix to add to the end of the subscription name. This is useful when needing to create "
375-
"multiple subscriptions for the same topic (subscription names are unique).",
376-
)
377360
def create_push_subscription(
378361
project_name,
379362
service_namespace,
380363
service_name,
381364
push_endpoint,
382365
expiration_time,
383366
revision_tag,
384-
filter,
385-
subscription_suffix,
386367
):
387-
"""Create a Google Pub/Sub push subscription for an Octue service for it to receive questions from parents. If a
388-
corresponding topic doesn't exist, it will be created first. The subscription name is printed on completion.
368+
"""Create a Google Pub/Sub push subscription for an Octue service for it to receive questions from parents. The
369+
subscription name is printed on completion.
389370
390371
PROJECT_NAME is the name of the Google Cloud project in which the subscription will be created
391372
@@ -403,8 +384,7 @@ def create_push_subscription(
403384
sruid,
404385
push_endpoint,
405386
expiration_time=expiration_time,
406-
subscription_filter=filter or None,
407-
subscription_suffix=subscription_suffix,
387+
subscription_filter=f'attributes.recipient = "{sruid}" AND attributes.sender_type = "PARENT"',
408388
)
409389

410390
click.echo(sruid)

octue/cloud/deployment/google/answer_pub_sub_question.py

+2-7
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import logging
22

3-
from octue.cloud.pub_sub import Topic
43
from octue.cloud.pub_sub.service import Service
5-
from octue.cloud.service_id import convert_service_id_to_pub_sub_form, create_sruid, get_sruid_parts
4+
from octue.cloud.service_id import create_sruid, get_sruid_parts
65
from octue.configuration import load_service_and_app_configuration
76
from octue.resources.service_backends import GCPPubSubBackend
87
from octue.runner import Runner
@@ -47,9 +46,5 @@ def answer_question(question, project_name):
4746
logger.info("Analysis successfully run and response sent for question %r.", question_uuid)
4847

4948
except BaseException as error: # noqa
50-
service.send_exception(
51-
topic=Topic(name=convert_service_id_to_pub_sub_form(service_sruid), project_name=project_name),
52-
question_uuid=question_uuid,
53-
)
54-
49+
service.send_exception(question_uuid=question_uuid, originator="UNKNOWN")
5550
logger.exception(error)

octue/cloud/emulators/_pub_sub.py

+25-17
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
import importlib.metadata
22
import json
33
import logging
4+
from collections import defaultdict
45

56
import google.api_core
67

78
from octue.cloud.pub_sub import Subscription, Topic
8-
from octue.cloud.pub_sub.service import ANSWERS_NAMESPACE, PARENT_SENDER_TYPE, Service
9-
from octue.cloud.service_id import convert_service_id_to_pub_sub_form
9+
from octue.cloud.pub_sub.service import PARENT_SENDER_TYPE, Service
1010
from octue.resources import Manifest
1111
from octue.utils.dictionaries import make_minimal_dictionary
1212
from octue.utils.encoders import OctueJSONEncoder
1313

1414

1515
logger = logging.getLogger(__name__)
1616

17-
TOPICS = {}
18-
SUBSCRIPTIONS = {}
17+
TOPICS = set()
18+
SUBSCRIPTIONS = set()
19+
MESSAGES = defaultdict(list)
1920

2021

2122
class MockTopic(Topic):
@@ -33,7 +34,7 @@ def create(self, allow_existing=False):
3334
raise google.api_core.exceptions.AlreadyExists(f"Topic {self.path!r} already exists.")
3435

3536
if not self.exists():
36-
TOPICS[self.name] = []
37+
TOPICS.add(self.name)
3738
self._created = True
3839

3940
def delete(self):
@@ -42,7 +43,7 @@ def delete(self):
4243
:return None:
4344
"""
4445
try:
45-
del TOPICS[self.name]
46+
TOPICS.remove(self.name)
4647
except KeyError:
4748
pass
4849

@@ -73,7 +74,7 @@ def create(self, allow_existing=False):
7374
raise google.api_core.exceptions.AlreadyExists(f"Subscription {self.path!r} already exists.")
7475

7576
if not self.exists():
76-
SUBSCRIPTIONS[self.name] = []
77+
SUBSCRIPTIONS.add(self.name)
7778
self._created = True
7879

7980
def delete(self):
@@ -146,8 +147,7 @@ def publish(self, topic, data, retry=None, **attributes):
146147
:param google.api_core.retry.Retry|None retry:
147148
:return MockFuture:
148149
"""
149-
subscription_name = ".".join((get_pub_sub_resource_name(topic), ANSWERS_NAMESPACE, attributes["question_uuid"]))
150-
SUBSCRIPTIONS[subscription_name].append(MockMessage(data=data, attributes=attributes))
150+
MESSAGES[attributes["question_uuid"]].append(MockMessage(data=data, attributes=attributes))
151151
return MockFuture()
152152

153153

@@ -191,12 +191,12 @@ def pull(self, request, timeout=None, retry=None):
191191
if self.closed:
192192
raise ValueError("ValueError: Cannot invoke RPC: Channel closed!")
193193

194-
subscription_name = get_pub_sub_resource_name(request["subscription"])
194+
question_uuid = request["subscription"].split(".")[-1]
195195

196196
try:
197197
return MockPullResponse(
198198
received_messages=[
199-
MockMessageWrapper(message=SUBSCRIPTIONS[subscription_name].pop(0)),
199+
MockMessageWrapper(message=MESSAGES[question_uuid].pop(0)),
200200
]
201201
)
202202

@@ -242,6 +242,13 @@ def __init__(self, message):
242242
self.message = message
243243
self.ack_id = None
244244

245+
def __repr__(self):
246+
"""Represent the mock message as a string.
247+
248+
:return str:
249+
"""
250+
return f"<{type(self).__name__}(message={self.message})>"
251+
245252

246253
class MockMessage:
247254
"""A mock Pub/Sub message containing serialised data and any number of attributes.
@@ -356,8 +363,7 @@ def ask(
356363

357364
# Delete question from messages sent to subscription so the parent doesn't pick it up as a response message. We
358365
# do this as subscription filtering isn't implemented in this set of mocks.
359-
subscription_name = ".".join((convert_service_id_to_pub_sub_form(service_id), ANSWERS_NAMESPACE, question_uuid))
360-
SUBSCRIPTIONS["octue.services." + subscription_name].pop(0)
366+
MESSAGES[question_uuid].pop(0)
361367

362368
question = make_minimal_dictionary(kind="question", input_values=input_values, children=children)
363369

@@ -371,13 +377,15 @@ def ask(
371377
MockMessage.from_primitive(
372378
data=question,
373379
attributes={
374-
"sender_type": PARENT_SENDER_TYPE,
375380
"question_uuid": question_uuid,
376381
"forward_logs": subscribe_to_logs,
377-
"version": parent_sdk_version,
378382
"save_diagnostics": save_diagnostics,
379-
"message_number": 0,
380-
"sender": service_id,
383+
"order": 0,
384+
"originator": self.id,
385+
"sender": self.id,
386+
"sender_type": PARENT_SENDER_TYPE,
387+
"sender_sdk_version": parent_sdk_version,
388+
"recipient": service_id,
381389
},
382390
)
383391
)

octue/cloud/events/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
OCTUE_SERVICES_PREFIX = "octue.services"

octue/cloud/events/counter.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
class EventCounter:
2+
"""A mutable counter for keeping track of the emission order of events. This is used in the `Service` class instead
3+
of an integer because it is mutable and can be passed to the `Service._emit_event` method and incremented as
4+
events are emitted.
5+
6+
:return None:
7+
"""
8+
9+
def __init__(self):
10+
self.count = 0
11+
12+
def __iadd__(self, other):
13+
"""Increment the counter by an integer.
14+
15+
:return octue.cloud.events.counter.EventCounter: the event counter with its count updated
16+
"""
17+
if not isinstance(other, int):
18+
raise ValueError(f"Event counters can only be incremented by an integer; received {other!r}.")
19+
20+
self.count += other
21+
return self
22+
23+
def __int__(self):
24+
"""Get the counter as an integer.
25+
26+
:return int: the counter as an integer
27+
"""
28+
return int(self.count)
29+
30+
def __repr__(self):
31+
"""Represent the counter as a string.
32+
33+
:return str: the counter represented as a string.
34+
"""
35+
return f"<{type(self).__name__}(count={self.count})"

octue/cloud/events/handler.py

+19-21
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class AbstractEventHandler:
3131
"""An abstract event handler. Inherit from this and add the `handle_events` and `_extract_event_and_attributes`
3232
methods to handle events from a specific source synchronously or asynchronously.
3333
34-
:param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the events
34+
:param octue.cloud.pub_sub.service.Service recipient: the service that's receiving the events
3535
:param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive
3636
:param bool record_events: if `True`, record received events in the `received_events` attribute
3737
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers should not mutate the events.
@@ -43,15 +43,15 @@ class AbstractEventHandler:
4343

4444
def __init__(
4545
self,
46-
receiving_service,
46+
recipient,
4747
handle_monitor_message=None,
4848
record_events=True,
4949
event_handlers=None,
5050
schema=SERVICE_COMMUNICATION_SCHEMA,
5151
skip_missing_events_after=10,
5252
only_handle_result=False,
5353
):
54-
self.receiving_service = receiving_service
54+
self.recipient = recipient
5555
self.handle_monitor_message = handle_monitor_message
5656
self.record_events = record_events
5757
self.schema = schema
@@ -122,33 +122,31 @@ def _extract_and_enqueue_event(self, container):
122122
if not is_event_valid(
123123
event=event,
124124
attributes=attributes,
125-
receiving_service=self.receiving_service,
125+
recipient=self.recipient,
126126
parent_sdk_version=PARENT_SDK_VERSION,
127-
child_sdk_version=attributes.get("version"),
127+
child_sdk_version=attributes["sender_sdk_version"],
128128
schema=self.schema,
129129
):
130130
return
131131

132132
# Get the child's SRUID and Octue SDK version from the first event.
133133
if not self._child_sdk_version:
134-
self.question_uuid = attributes.get("question_uuid")
135-
self._child_sdk_version = attributes["version"]
134+
self.child_sruid = attributes["sender"]
135+
self.question_uuid = attributes["question_uuid"]
136+
self._child_sdk_version = attributes["sender_sdk_version"]
136137

137-
# Backwards-compatible with previous event schema versions.
138-
self.child_sruid = attributes.get("sender", "REMOTE")
138+
logger.debug("%r received an event related to question %r.", self.recipient, self.question_uuid)
139+
order = attributes["order"]
139140

140-
logger.debug("%r received an event related to question %r.", self.receiving_service, self.question_uuid)
141-
event_number = attributes["message_number"]
142-
143-
if event_number in self.waiting_events:
141+
if order in self.waiting_events:
144142
logger.warning(
145-
"%r: Event with duplicate event number %d received for question %s - overwriting original event.",
146-
self.receiving_service,
147-
event_number,
143+
"%r: Event with duplicate order %d received for question %s - overwriting original event.",
144+
self.recipient,
145+
order,
148146
self.question_uuid,
149147
)
150148

151-
self.waiting_events[event_number] = event
149+
self.waiting_events[order] = event
152150

153151
def _attempt_to_handle_waiting_events(self):
154152
"""Attempt to handle events waiting in `self.waiting_events`. If these events aren't consecutive to the
@@ -204,7 +202,7 @@ def _skip_to_earliest_waiting_event(self):
204202
logger.warning(
205203
"%r: %d consecutive events missing for question %r after %ds - skipping to next earliest waiting event "
206204
"(event %d).",
207-
self.receiving_service,
205+
self.recipient,
208206
number_of_missing_events,
209207
self.question_uuid,
210208
self.skip_missing_events_after,
@@ -236,7 +234,7 @@ def _handle_delivery_acknowledgement(self, event):
236234
:param dict event:
237235
:return None:
238236
"""
239-
logger.info("%r's question was delivered at %s.", self.receiving_service, event["datetime"])
237+
logger.info("%r's question was delivered at %s.", self.recipient, event["datetime"])
240238

241239
def _handle_heartbeat(self, event):
242240
"""Record the time the heartbeat was received.
@@ -253,7 +251,7 @@ def _handle_monitor_message(self, event):
253251
:param dict event:
254252
:return None:
255253
"""
256-
logger.debug("%r received a monitor message.", self.receiving_service)
254+
logger.debug("%r received a monitor message.", self.recipient)
257255

258256
if self.handle_monitor_message is not None:
259257
self.handle_monitor_message(event["data"])
@@ -317,7 +315,7 @@ def _handle_result(self, event):
317315
:param dict event:
318316
:return dict:
319317
"""
320-
logger.info("%r received an answer to question %r.", self.receiving_service, self.question_uuid)
318+
logger.info("%r received an answer to question %r.", self.recipient, self.question_uuid)
321319

322320
if event.get("output_manifest"):
323321
output_manifest = Manifest.deserialise(event["output_manifest"])

octue/cloud/events/replayer.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
class EventReplayer(AbstractEventHandler):
1313
"""A replayer for events retrieved asynchronously from some kind of storage.
1414
15-
:param octue.cloud.pub_sub.service.Service receiving_service: the service that's receiving the events
15+
:param octue.cloud.pub_sub.service.Service recipient: the service that's receiving the events
1616
:param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive
1717
:param bool record_events: if `True`, record received events in the `received_events` attribute
1818
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers should not mutate the events.
@@ -23,15 +23,15 @@ class EventReplayer(AbstractEventHandler):
2323

2424
def __init__(
2525
self,
26-
receiving_service=None,
26+
recipient=None,
2727
handle_monitor_message=None,
2828
record_events=True,
2929
event_handlers=None,
3030
schema=SERVICE_COMMUNICATION_SCHEMA,
3131
only_handle_result=False,
3232
):
3333
super().__init__(
34-
receiving_service or Service(backend=ServiceBackend(), service_id="local/local:local"),
34+
recipient or Service(backend=ServiceBackend(), service_id="local/local:local"),
3535
handle_monitor_message=handle_monitor_message,
3636
record_events=record_events,
3737
event_handlers=event_handlers,
@@ -51,14 +51,16 @@ def handle_events(self, events):
5151
for event in events:
5252
self._extract_and_enqueue_event(event)
5353

54-
self._earliest_waiting_event_number = min(self.waiting_events.keys())
55-
return self._attempt_to_handle_waiting_events()
54+
# Handle the case where no events (or no valid events) have been received.
55+
if self.waiting_events:
56+
self._earliest_waiting_event_number = min(self.waiting_events.keys())
57+
return self._attempt_to_handle_waiting_events()
5658

5759
def _extract_event_and_attributes(self, container):
5860
"""Extract an event and its attributes from the event container.
5961
6062
:param dict container: the container of the event
6163
:return (any, dict): the event and its attributes
6264
"""
63-
container["attributes"]["message_number"] = int(container["attributes"]["message_number"])
65+
container["attributes"]["order"] = int(container["attributes"]["order"])
6466
return container["event"], container["attributes"]

0 commit comments

Comments
 (0)