Skip to content

Factor out attributes into EventAttributes class #721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
74f11d8
REF: Factor out attributes into `EventAttributes` class
cortadocodes Feb 11, 2025
23a7575
MRG: Merge branch 'rc-kueue' into factor-out-event-attributes
cortadocodes Feb 17, 2025
305a09c
MRG: Merge branch 'rc-kueue' into factor-out-event-attributes
cortadocodes Feb 26, 2025
16233a4
ENH: Set better defaults in `EventAttributes`
cortadocodes Feb 26, 2025
22b7e98
FIX: Avoid calling `to_dict` method on dictionary
cortadocodes Feb 26, 2025
5a501ae
TST: Fix tests
cortadocodes Feb 26, 2025
32fec93
FIX: Fix question attributes defaults
cortadocodes Feb 26, 2025
59dd51e
REF: Simplify attribute defaults
cortadocodes Feb 26, 2025
cc66592
FIX: Remove question attributes from `make_response_attributes`
cortadocodes Feb 26, 2025
9e6eb79
DOC: Add missing param to docstring
cortadocodes Feb 26, 2025
bc9f198
FIX: Remove default question attributes from `EventAttributes`
cortadocodes Feb 26, 2025
7dd7040
FIX: Only remove attributes if they're present
cortadocodes Feb 26, 2025
31ced10
FIX: Fix `save_diagnostics` default in `make_question_event`
cortadocodes Feb 26, 2025
eda2ec3
DOC: Add `attributes` param to docstrings
cortadocodes Feb 26, 2025
7adfe38
REF: Add default args and document `make_question_event`
cortadocodes Feb 26, 2025
2b57073
REF: Factor out emitting result event
cortadocodes Feb 26, 2025
5d6285f
DOC: Fix docstring
cortadocodes Feb 26, 2025
8188e7c
REF: Expect `datetime` attr as `datetime` object in `EventAttributes`
cortadocodes Feb 27, 2025
bd33a93
REF: Rename `EventAttributes` methods and add docstrings
cortadocodes Feb 27, 2025
b40bfb0
TST: Test `EventAttributes` class
cortadocodes Feb 27, 2025
3cbfa9a
REF: Make sender/recipient swap clearer
cortadocodes Feb 27, 2025
16e4e24
ENH: Remove need to (de)serialise `None` attribute values
cortadocodes Feb 27, 2025
dd3ce1d
REF: Use `EventAttributes` in `MockService`
cortadocodes Feb 27, 2025
571a4b6
DOC: Improve docstring
cortadocodes Feb 27, 2025
45c37ea
REV: Revert "WIP: Temporarily hard-code WIF details into CI workflow"
cortadocodes Feb 27, 2025
a35c2bf
WIP: Comment out question cancellation methods for now
cortadocodes Feb 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
with:
# NOTE: If setting create_credentials_file=true, .dockerignore file must include `gha-creds-*.json` to avoid baking these credentials into build
create_credentials_file: true
workload_identity_provider: "projects/437801218871/locations/global/workloadIdentityPools/dev-github-actions-pool/providers/dev-github-actions-provider"
workload_identity_provider: "projects/437801218871/locations/global/workloadIdentityPools/github-actions-pool/providers/github-actions-provider"
service_account: "[email protected]"

- name: Run tests
Expand Down
62 changes: 31 additions & 31 deletions octue/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,37 +541,37 @@ def diagnostics(cloud_path, local_path, download_datasets):
logger.info("Downloaded diagnostics from %r to %r.", cloud_path, local_path)


@question.command()
@click.argument("question_uuid", type=str)
@click.option(
"-p",
"--project-name",
type=str,
default=None,
help="If asking a remote question, the name of the Google Cloud project the service is deployed in. If not "
"provided, the project name is detected from the local Google application credentials if present.",
)
@click.option(
"-c",
"--service-config",
type=click.Path(dir_okay=False),
default=None,
help="An optional path to an `octue.yaml` file defining service registries to use. If not provided, the "
"`OCTUE_SERVICE_CONFIGURATION_PATH` environment variable is used if present, otherwise the local path `octue.yaml` "
"is used.",
)
def cancel(question_uuid, project_name, service_config):
"""Cancel a question running on an Octue Twined service.

QUESTION_UUID: The question UUID of a running question
"""
service_configuration = ServiceConfiguration.from_file(path=service_config)

if not project_name:
_, project_name = auth.default()

child = Child(id=None, backend={"name": "GCPPubSubBackend", "project_name": project_name})
child.cancel(question_uuid=question_uuid, event_store_table_id=service_configuration.event_store_table_id)
# @question.command()
# @click.argument("question_uuid", type=str)
# @click.option(
# "-p",
# "--project-name",
# type=str,
# default=None,
# help="If asking a remote question, the name of the Google Cloud project the service is deployed in. If not "
# "provided, the project name is detected from the local Google application credentials if present.",
# )
# @click.option(
# "-c",
# "--service-config",
# type=click.Path(dir_okay=False),
# default=None,
# help="An optional path to an `octue.yaml` file defining service registries to use. If not provided, the "
# "`OCTUE_SERVICE_CONFIGURATION_PATH` environment variable is used if present, otherwise the local path `octue.yaml` "
# "is used.",
# )
# def cancel(question_uuid, project_name, service_config):
# """Cancel a question running on an Octue Twined service.
#
# QUESTION_UUID: The question UUID of a running question
# """
# service_configuration = ServiceConfiguration.from_file(path=service_config)
#
# if not project_name:
# _, project_name = auth.default()
#
# child = Child(id=None, backend={"name": "GCPPubSubBackend", "project_name": project_name})
# child.cancel(question_uuid=question_uuid, event_store_table_id=service_configuration.event_store_table_id)


@octue_cli.command(deprecated=True)
Expand Down
42 changes: 20 additions & 22 deletions octue/cloud/emulators/_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import google.api_core

from octue.cloud.events.attributes import EventAttributes
from octue.cloud.pub_sub import Subscription, Topic
from octue.cloud.pub_sub.service import PARENT_SENDER_TYPE, Service
from octue.definitions import LOCAL_SDK_VERSION
Expand Down Expand Up @@ -405,30 +406,27 @@ def ask(
# If the originator isn't provided, assume that this service revision is the originator.
originator = originator or self.id

attributes = EventAttributes(
question_uuid=question_uuid,
parent_question_uuid=parent_question_uuid,
originator_question_uuid=originator_question_uuid,
forward_logs=subscribe_to_logs,
save_diagnostics=save_diagnostics,
parent=self.id,
originator=originator,
sender=self.id,
sender_type=PARENT_SENDER_TYPE,
sender_sdk_version=parent_sdk_version,
recipient=service_id,
retry_count=retry_count,
cpus=cpus,
memory=memory,
ephemeral_storage=ephemeral_storage,
)

try:
self.children[service_id].answer(
MockMessage.from_primitive(
data=question,
attributes={
"datetime": "2024-04-11T10:46:48.236064",
"uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f",
"question_uuid": question_uuid,
"parent_question_uuid": parent_question_uuid,
"originator_question_uuid": originator_question_uuid,
"forward_logs": subscribe_to_logs,
"save_diagnostics": save_diagnostics,
"parent": self.id,
"originator": originator,
"sender": self.id,
"sender_type": PARENT_SENDER_TYPE,
"sender_sdk_version": parent_sdk_version,
"recipient": service_id,
"retry_count": retry_count,
"cpus": cpus,
"memory": memory,
"ephemeral_storage": ephemeral_storage,
},
)
MockMessage.from_primitive(data=question, attributes=attributes.to_serialised_attributes())
)
except Exception as e: # noqa
logger.exception(e)
Expand Down
141 changes: 141 additions & 0 deletions octue/cloud/events/attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import datetime as dt
import uuid as uuid_library

from octue.definitions import LOCAL_SDK_VERSION
from octue.utils.dictionaries import make_minimal_dictionary

SENDER_TYPE_OPPOSITES = {"CHILD": "PARENT", "PARENT": "CHILD"}


class EventAttributes:
"""A data structure for holding and working with attributes for a single Octue Twined event. If originator and
parent information aren't provided, the attributes will correspond to an event of any kind related to an originator
question.

:param str sender: the unique identifier (SRUID) of the service revision sending the question
:param str sender_type: the type of sender for this event; must be one of {"PARENT", "CHILD"}
:param str recipient: the SRUID of the service revision the question is for
:param str|None uuid: the UUID of the event; if `None`, a UUID is generated
:param datetime.datetime|None datetime: the datetime the event was created at; defaults to the current datetime in UTC
:param str|None question_uuid: the UUID of the question; if `None`, a UUID is generated
:param str|None parent_question_uuid: the UUID of the question that triggered this question; this should be `None` if this event relates to the first question in a question tree
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question; if `None`, the event's related question is assumed to be the originator question and `question_uuid` is used
:param str|None parent: the SRUID of the service revision that asked the question this event is related to
:param str|None originator: the SRUID of the service revision that triggered all ancestor questions of this question; if `None`, the `sender` is used
:param str sender_sdk_version: the semantic version of Octue SDK the sender is running; defaults to the version in the environment
:param int retry_count: the retry count of the question this event is related to (this is zero if it's the first attempt at the question)
:param bool|None forward_logs: if this isn't a `question` event, this should be `None`; otherwise, it should be a boolean indicating whether the parent requested the child to forward its logs to it
:param str|None save_diagnostics: if this isn't a `question` event, this should be `None`; otherwise, it must be one of {"SAVE_DIAGNOSTICS_OFF", "SAVE_DIAGNOSTICS_ON_CRASH", "SAVE_DIAGNOSTICS_ON"}
:param int|None cpus: if this isn't a `question` event, this should be `None`; otherwise, it can be `None` or the number of CPUs requested for the question
:param str|None memory: if this isn't a `question` event, this should be `None`; otherwise, it can be `None` or the amount of memory requested for the question e.g. "256Mi" or "1Gi"
:param str|None ephemeral_storage: if this isn't a `question` event, this should be `None`; otherwise, it can be `None` or the amount of ephemeral storage requested for the question e.g. "256Mi" or "1Gi"
:return None:
"""

def __init__(
self,
sender,
sender_type,
recipient,
uuid=None,
datetime=None,
question_uuid=None,
parent_question_uuid=None,
originator_question_uuid=None,
parent=None,
originator=None,
sender_sdk_version=LOCAL_SDK_VERSION,
retry_count=0,
forward_logs=None,
save_diagnostics=None,
cpus=None,
memory=None,
ephemeral_storage=None,
):
# Attributes for all event kinds.
self.uuid = uuid or str(uuid_library.uuid4())
self.datetime = datetime or dt.datetime.now(tz=dt.timezone.utc)
self.question_uuid = question_uuid or str(uuid_library.uuid4())
self.parent_question_uuid = parent_question_uuid
self.originator_question_uuid = originator_question_uuid or self.question_uuid
self.sender = sender
self.parent = parent or self.sender
self.originator = originator or self.sender
self.sender_type = sender_type
self.sender_sdk_version = sender_sdk_version
self.recipient = recipient
self.retry_count = retry_count

# Question event attributes.
self.forward_logs = forward_logs
self.save_diagnostics = save_diagnostics
self.cpus = cpus
self.memory = memory
self.ephemeral_storage = ephemeral_storage

def make_opposite_attributes(self):
"""Create the attributes for an event of the opposite sender type to this event (parent -> child or child
-> parent). For example, if these attributes are for a question event, create the attributes for a response
event such as a result or log record.

:return octue.cloud.events.attributes.EventAttributes: the event attributes for an event with the opposite sender type
"""
attributes = self.to_minimal_dict()
attributes["sender"] = self.recipient
attributes["recipient"] = self.sender
attributes["sender_type"] = SENDER_TYPE_OPPOSITES[self.sender_type]
attributes["sender_sdk_version"] = LOCAL_SDK_VERSION

for attr in ("forward_logs", "save_diagnostics", "cpus", "memory", "ephemeral_storage"):
if attr in attributes:
del attributes[attr]

return EventAttributes(**attributes)

def to_minimal_dict(self):
"""Convert the attributes to a minimal dictionary containing only the attributes that have a non-`None` value.
Using a minimal dictionary means the smallest possible data structure is used so `None` values don't,
for example, need to be redundantly encoded and transmitted when part of a JSON payload for a Pub/Sub message.

:return dict: the non-`None` attributes
"""
return make_minimal_dictionary(
uuid=self.uuid,
datetime=self.datetime,
question_uuid=self.question_uuid,
parent_question_uuid=self.parent_question_uuid,
originator_question_uuid=self.originator_question_uuid,
parent=self.parent,
originator=self.originator,
sender=self.sender,
sender_type=self.sender_type,
sender_sdk_version=self.sender_sdk_version,
recipient=self.recipient,
retry_count=self.retry_count,
forward_logs=self.forward_logs,
save_diagnostics=self.save_diagnostics,
cpus=self.cpus,
memory=self.memory,
ephemeral_storage=self.ephemeral_storage,
)

def to_serialised_attributes(self):
"""Convert the attributes to their serialised forms. This is required for e.g. sending the attributes as message
attributes on a Pub/Sub message. A minimal dictionary is produced containing only the attributes that have a
non-`None` value.

:return dict: the attribute names of the non-`None` attributes mapped to their serialised values
"""
serialised_attributes = {}

for key, value in self.to_minimal_dict().items():
if isinstance(value, bool):
value = str(int(value))
elif isinstance(value, (int, float)):
value = str(value)
elif isinstance(value, dt.datetime):
value = value.isoformat()

serialised_attributes[key] = value

return serialised_attributes
9 changes: 3 additions & 6 deletions octue/cloud/events/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@ def extract_and_deserialise_attributes(container):
# Cast attributes to a dictionary to avoid defaultdict-like behaviour from Pub/Sub message attributes container.
attributes = dict(get_nested_attribute(container, "attributes"))

# Deserialise the `parent_question_uuid`, `forward_logs`, and `retry_count`, fields if they're present
# (don't assume they are before validation).
if attributes.get("parent_question_uuid") == "null":
attributes["parent_question_uuid"] = None

# Deserialise the `retry_count`, attribute if it's present (don't assume it is before validation).
retry_count = attributes.get("retry_count")

if retry_count:
attributes["retry_count"] = int(retry_count)
else:
attributes["retry_count"] = None

# Question events have some extra optional attributes.
# Question events have some extra optional attributes that also need deserialising if they're present (don't assume
# they are before validation).
if attributes.get("sender_type") == "PARENT":
forward_logs = attributes.get("forward_logs")

Expand Down
Loading