Skip to content

Commit 146d5ba

Browse files
authored
MRG: Merge pull request #647 from octue/improve-async-retrieved-events
Improve async event retrieval workflow
2 parents 6999b22 + 2f4e8b7 commit 146d5ba

File tree

24 files changed

+376
-251
lines changed

24 files changed

+376
-251
lines changed

.github/workflows/python-ci.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
if: "!contains(github.event.head_commit.message, 'skipci')"
2424
runs-on: ${{ matrix.os }}
2525
env:
26-
USING_COVERAGE: "3.9"
26+
USING_COVERAGE: "3.10"
2727
strategy:
2828
matrix:
2929
os: [ubuntu-latest, windows-latest, macos-latest]
@@ -34,9 +34,9 @@ jobs:
3434
uses: actions/checkout@v3
3535

3636
- name: Setup Python
37-
uses: actions/setup-python@v2
37+
uses: actions/setup-python@v5
3838
with:
39-
python-version: 3.9
39+
python-version: "3.10"
4040

4141
- name: Install Poetry
4242
uses: snok/[email protected]

.github/workflows/release.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
if: "github.event.pull_request.merged == true"
1616
runs-on: ${{ matrix.os }}
1717
env:
18-
USING_COVERAGE: "3.9"
18+
USING_COVERAGE: "3.10"
1919
strategy:
2020
matrix:
2121
os: [ubuntu-latest, windows-latest, macos-latest]
@@ -26,9 +26,9 @@ jobs:
2626
uses: actions/checkout@v3
2727

2828
- name: Setup Python
29-
uses: actions/setup-python@v2
29+
uses: actions/setup-python@v5
3030
with:
31-
python-version: 3.9
31+
python-version: "3.10"
3232

3333
- name: Install Poetry
3434
uses: snok/[email protected]

.github/workflows/version-compatibility.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
runs-on: ubuntu-latest
1212
steps:
1313
- uses: actions/checkout@v3
14-
- uses: actions/setup-python@v2
14+
- uses: actions/setup-python@v5
1515
- name: Install Poetry
1616
uses: snok/install-poetry@v1
1717
- name: Check version compatibility has been tested

README.md

+14-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
[![Documentation Status](https://readthedocs.org/projects/octue-python-sdk/badge/?version=latest)](https://octue-python-sdk.readthedocs.io/en/latest/?badge=latest)
55
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
66
[![black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black)
7+
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.10961975.svg)](https://doi.org/10.5281/zenodo.10961975)
78

89
# Octue Python SDK <img src="./docs/source/images/213_purple-fruit-snake-transparent.gif" alt="Purple Fruit Snake" width="100"/></span>
910

@@ -15,7 +16,9 @@ Read the docs [here.](https://octue-python-sdk.readthedocs.io/en/latest/)
1516
Uses our [twined](https://twined.readthedocs.io/en/latest/) library for data validation.
1617

1718
## Installation and usage
19+
1820
To install, run one of:
21+
1922
```shell
2023
pip install octue
2124
```
@@ -25,6 +28,7 @@ poetry add octue
2528
```
2629

2730
The command line interface (CLI) can then be accessed via:
31+
2832
```shell
2933
octue --help
3034
```
@@ -59,13 +63,15 @@ Commands:
5963
```
6064

6165
## Deprecated code
66+
6267
When code is deprecated, it will still work but a deprecation warning will be issued with a suggestion on how to update
6368
it. After an adjustment period, deprecations will be removed from the codebase according to the [code removal schedule](https://github.com/octue/octue-sdk-python/issues/415).
6469
This constitutes a breaking change.
6570

6671
## Developer notes
6772

6873
### Installation
74+
6975
We use [Poetry](https://python-poetry.org/) as our package manager. For development, run the following from the
7076
repository root, which will editably install the package:
7177

@@ -76,18 +82,24 @@ poetry install --all-extras
7682
Then run the tests to check everything's working.
7783

7884
### Testing
85+
7986
These environment variables need to be set to run the tests:
80-
* `GOOGLE_APPLICATION_CREDENTIALS=/absolute/path/to/service/account/file.json`
81-
* `TEST_PROJECT_NAME=<name-of-google-cloud-project-to-run-pub-sub-tests-on>`
87+
88+
- `GOOGLE_APPLICATION_CREDENTIALS=/absolute/path/to/service/account/file.json`
89+
- `TEST_PROJECT_NAME=<name-of-google-cloud-project-to-run-pub-sub-tests-on>`
8290

8391
Then, from the repository root, run
92+
8493
```shell
8594
python3 -m unittest
8695
```
96+
8797
or
98+
8899
```shell
89100
tox
90101
```
91102

92103
## Contributing
104+
93105
Take a look at our [contributing](/docs/contributing.md) page.

docs/source/asking_questions.rst

+3-4
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Asking a question
3535
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
3636
)
3737
38-
answer = child.ask(
38+
answer, question_uuid = child.ask(
3939
input_values={"height": 32, "width": 3},
4040
input_manifest=manifest,
4141
)
@@ -104,7 +104,6 @@ access the event store and run:
104104
**Options**
105105

106106
- ``kind`` - Only retrieve this kind of event if present (e.g. "result")
107-
- ``include_attributes`` - If ``True``, retrieve all the events' attributes as well
108107
- ``include_backend_metadata`` - If ``True``, retrieve information about the service backend that produced the event
109108
- ``limit`` - If set to a positive integer, limit the number of events returned to this
110109

@@ -232,7 +231,7 @@ this:
232231

233232
.. code-block:: python
234233
235-
answer = analysis.children["elevation"].ask(input_values={"longitude": 0, "latitude": 1})
234+
answer, question_uuid = analysis.children["elevation"].ask(input_values={"longitude": 0, "latitude": 1})
236235
237236
if your app configuration file is:
238237

@@ -323,7 +322,7 @@ then you can override them like this:
323322

324323
.. code-block:: python
325324
326-
answer = child.ask(
325+
answer, question_uuid = child.ask(
327326
input_values={"height": 32, "width": 3},
328327
children=[
329328
{

docs/source/manifest.rst

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ Get an Octue service to analyse data for you as part of a larger analysis.
4949
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
5050
)
5151
52-
answer = child.ask(input_manifest=manifest)
52+
answer, question_uuid = child.ask(input_manifest=manifest)
5353
5454
See :doc:`here <asking_questions>` for more information.
5555

@@ -108,7 +108,7 @@ the cloud and then download them again for each service (as would happen with cl
108108
}
109109
)
110110
111-
analysis.children["wind_speed"].ask(
111+
answer, question_uuid = analysis.children["wind_speed"].ask(
112112
input_values=analysis.input_values,
113113
input_manifest=analysis.input_manifest,
114114
allow_local_files=True,

docs/source/testing_services.rst

+4-4
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ Instantiating a child emulator in python
8787
def handle_monitor_message(message):
8888
...
8989
90-
result = child_emulator.ask(
90+
result, question_uuid = child_emulator.ask(
9191
input_values={"hello": "world"},
9292
handle_monitor_message=handle_monitor_message,
9393
)
@@ -133,7 +133,7 @@ You can then instantiate a child emulator from this in python:
133133
def handle_monitor_message(message):
134134
...
135135
136-
result = child_emulator.ask(
136+
result, question_uuid = child_emulator.ask(
137137
input_values={"hello": "world"},
138138
handle_monitor_message=handle_monitor_message,
139139
)
@@ -226,7 +226,7 @@ child.
226226
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
227227
)
228228
229-
result = child.ask(input_values=[1, 2, 3, 4])
229+
result, question_uuid = child.ask(input_values=[1, 2, 3, 4])
230230
231231
child.received_events
232232
>>> [
@@ -260,6 +260,6 @@ You can then feed these into a child emulator to emulate one possible response o
260260
child_emulator = ChildEmulator(events=child.received_events)
261261
262262
child_emulator.ask(input_values=[1, 2, 3, 4])
263-
>>> {"some": "results"}
263+
>>> {"some": "results"}, "9cab579f-c486-4324-ac9b-96491d26266b"
264264
265265
You can also create test fixtures from :ref:`downloaded service crash diagnostics <test_fixtures_from_crash_diagnostics>`.

docs/source/troubleshooting_services.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ For example:
121121
backend={"name": "GCPPubSubBackend", "project_name": "my-project"},
122122
)
123123
124-
answer = child.ask(
124+
answer, question_uuid = child.ask(
125125
input_values={"height": 32, "width": 3},
126126
save_diagnostics="SAVE_DIAGNOSTICS_OFF",
127127
)

octue/cloud/deployment/google/cloud_run/Dockerfile-python311

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM windpioneers/gdal-python:little-gecko-gdal-2.4.1-python-3.11-slim
1+
FROM windpioneers/gdal-python:modest-heron-gdal-2.4.1-python-3.11-slim
22

33
# Ensure print statements and log messages appear promptly in Cloud Logging.
44
ENV PYTHONUNBUFFERED True

octue/cloud/emulators/child.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,12 @@ def ask(
125125
:param bool asynchronous: if `True`, don't create an answer subscription
126126
:param float timeout: time in seconds to wait for an answer before raising a timeout error
127127
:raise TimeoutError: if the timeout is exceeded while waiting for an answer
128-
:return dict: a dictionary containing the keys "output_values" and "output_manifest"
128+
:return dict, str: a dictionary containing the keys "output_values" and "output_manifest", and the question UUID
129129
"""
130130
with ServicePatcher():
131131
self._child.serve(allow_existing=True)
132132

133-
subscription, _ = self._parent.ask(
133+
subscription, question_uuid = self._parent.ask(
134134
service_id=self._child.id,
135135
input_values=input_values,
136136
input_manifest=input_manifest,
@@ -141,13 +141,15 @@ def ask(
141141
asynchronous=asynchronous,
142142
)
143143

144-
return self._parent.wait_for_answer(
144+
answer = self._parent.wait_for_answer(
145145
subscription,
146146
handle_monitor_message=handle_monitor_message,
147147
record_events=record_events,
148148
timeout=timeout,
149149
)
150150

151+
return answer, question_uuid
152+
151153
def _emulate_analysis(
152154
self,
153155
analysis_id,

octue/cloud/pub_sub/bigquery.py

+74-33
Original file line numberDiff line numberDiff line change
@@ -3,53 +3,46 @@
33
from google.cloud.bigquery import Client, QueryJobConfig, ScalarQueryParameter
44

55
from octue.cloud.events.validation import VALID_EVENT_KINDS
6+
from octue.exceptions import ServiceNotFound
7+
from octue.resources import Manifest
68

79

8-
def get_events(
9-
table_id,
10-
sender,
11-
question_uuid,
12-
kind=None,
13-
include_attributes=False,
14-
include_backend_metadata=False,
15-
limit=1000,
16-
):
10+
def get_events(table_id, sender, question_uuid, kind=None, include_backend_metadata=False, limit=1000):
1711
"""Get Octue service events for a question from a sender from a Google BigQuery event store.
1812
1913
:param str table_id: the full ID of the table e.g. "your-project.your-dataset.your-table"
2014
:param str sender: the SRUID of the sender of the events
2115
:param str question_uuid: the UUID of the question to get the events for
2216
:param str|None kind: the kind of event to get; if `None`, all event kinds are returned
23-
:param bool include_attributes: if `True`, include events' attributes (excluding question UUID)
2417
:param bool include_backend_metadata: if `True`, include the service backend metadata
2518
:param int limit: the maximum number of events to return
19+
:raise ValueError: if the `kind` parameter is invalid
20+
:raise octue.exceptions.ServiceNotFound: if the sender hasn't emitted any events related to the question UUID (or any events at all)
2621
:return list(dict): the events for the question
2722
"""
2823
if kind:
2924
if kind not in VALID_EVENT_KINDS:
3025
raise ValueError(f"`kind` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}.")
3126

32-
event_kind_condition = [f'AND JSON_EXTRACT_SCALAR(event, "$.kind") = "{kind}"']
27+
event_kind_condition = [f"AND kind={kind!r}"]
3328
else:
3429
event_kind_condition = []
3530

3631
client = Client()
37-
fields = ["`event`"]
38-
39-
if include_attributes:
40-
fields.extend(
41-
(
42-
"`datetime`",
43-
"`uuid`",
44-
"`originator`",
45-
"`sender`",
46-
"`sender_type`",
47-
"`sender_sdk_version`",
48-
"`recipient`",
49-
"`order`",
50-
"`other_attributes`",
51-
)
52-
)
32+
33+
fields = [
34+
"`event`",
35+
"`kind`",
36+
"`datetime`",
37+
"`uuid`",
38+
"`originator`",
39+
"`sender`",
40+
"`sender_type`",
41+
"`sender_sdk_version`",
42+
"`recipient`",
43+
"`order`",
44+
"`other_attributes`",
45+
]
5346

5447
if include_backend_metadata:
5548
fields.extend(("`backend`", "`backend_metadata`"))
@@ -74,16 +67,64 @@ def get_events(
7467
)
7568

7669
query_job = client.query(query, job_config=job_config)
77-
rows = query_job.result()
78-
df = rows.to_dataframe()
70+
result = query_job.result()
71+
72+
if result.total_rows == 0:
73+
raise ServiceNotFound(
74+
f"No events found. The requested sender {sender!r} may not exist or it hasn't emitted any events for "
75+
f"question {question_uuid!r} (or any events at all)."
76+
)
77+
78+
df = result.to_dataframe()
7979

8080
# Convert JSON strings to python primitives.
8181
df["event"] = df["event"].map(json.loads)
82-
83-
if "other_attributes" in df:
84-
df["other_attributes"] = df["other_attributes"].map(json.loads)
82+
df["event"].apply(_deserialise_manifest_if_present)
83+
df["other_attributes"] = df["other_attributes"].map(json.loads)
8584

8685
if "backend_metadata" in df:
8786
df["backend_metadata"] = df["backend_metadata"].map(json.loads)
8887

89-
return df.to_dict(orient="records")
88+
events = df.to_dict(orient="records")
89+
return _unflatten_events(events)
90+
91+
92+
def _deserialise_manifest_if_present(event):
93+
"""If the event is a "question" or "result" event and a manifest is present, deserialise the manifest and replace
94+
the serialised manifest with it.
95+
96+
:param dict event: an Octue service event
97+
:return None:
98+
"""
99+
manifest_keys = {"input_manifest", "output_manifest"}
100+
101+
for key in manifest_keys:
102+
if key in event:
103+
event[key] = Manifest.deserialise(event[key])
104+
# Only one of the manifest types will be in the event, so return if one is found.
105+
return
106+
107+
108+
def _unflatten_events(events):
109+
"""Convert the events and attributes from the flat structure of the BigQuery table into the nested structure of the
110+
service communication schema.
111+
112+
:param list(dict) events: flattened events
113+
:return list(dict): unflattened events
114+
"""
115+
for event in events:
116+
event["event"]["kind"] = event.pop("kind")
117+
118+
event["attributes"] = {
119+
"datetime": event.pop("datetime").isoformat(),
120+
"uuid": event.pop("uuid"),
121+
"originator": event.pop("originator"),
122+
"sender": event.pop("sender"),
123+
"sender_type": event.pop("sender_type"),
124+
"sender_sdk_version": event.pop("sender_sdk_version"),
125+
"recipient": event.pop("recipient"),
126+
"order": event.pop("order"),
127+
**event.pop("other_attributes"),
128+
}
129+
130+
return events

0 commit comments

Comments
 (0)