Skip to content

Improve event filtering #673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9c44593
ENH: Disable event validation in `EventReplayer` by default
cortadocodes Aug 5, 2024
a87c7e3
ENH: Enable excluding event kinds in `get_events`
cortadocodes Aug 5, 2024
1c71a0d
ENH: Enable filtering by multiple event kinds in `get_events`
cortadocodes Aug 5, 2024
e7576ca
ENH: Use all non-question events for redelivery check in flask app
cortadocodes Aug 5, 2024
033405b
ENH: Add ability to skip handling logs containing certain text
cortadocodes Aug 5, 2024
266f327
TST: Update event replayer test
cortadocodes Aug 5, 2024
188966a
OPS: Increase minor version number
cortadocodes Aug 5, 2024
49bd367
ENH: Add `exclude_logs_containing` to `EventReplayer`
cortadocodes Aug 5, 2024
4ab7041
ENH: Return outside of `ThreadPoolExecutor` context manager
cortadocodes Aug 6, 2024
ba28cfa
FEA: Add `dictionary_product` utility function
cortadocodes Aug 6, 2024
aacefd6
TST: Test `dictionary_product`
cortadocodes Aug 6, 2024
b25f80f
FIX: Add `validate_events` to Pub/Sub event handler (default `True`)
cortadocodes Aug 6, 2024
a62a703
FIX: Add missing arguments to `GoogleCloudPubSubEventHandler`
cortadocodes Aug 6, 2024
d180fdc
DOC: Fix docstring
cortadocodes Aug 6, 2024
681f476
REF: Rename variable inside `dictionary_product`
cortadocodes Aug 6, 2024
7c1c65b
TST: Improve `get_events` test
cortadocodes Aug 6, 2024
c833934
ENH: Raise error if `kinds` and `exclude_kinds` are provided at once
cortadocodes Aug 6, 2024
d4c1953
DOC: Mention default of no validation in `EventReplayer` docstring
cortadocodes Aug 6, 2024
59db6ed
TST: Clarify `dictionary_product` tests slightly
cortadocodes Aug 6, 2024
22d0d2e
REF: Update from legacy syntax for `ENV` layers in dockerfiles
cortadocodes Aug 13, 2024
a40c12a
CHO: Add version compatibility data
cortadocodes Aug 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV HISTFILE="/workspaces/octue-sdk-python/.devcontainer/.zsh_history"
USER vscode
ENV POETRY_HOME=/home/vscode/.poetry
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH "$POETRY_HOME/bin:$PATH"
ENV PATH="$POETRY_HOME/bin:$PATH"
RUN poetry config virtualenvs.create false
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ RUN apt-get update && apt-get install -y git curl

ENV POETRY_HOME=/etc/poetry
RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH "$POETRY_HOME/bin:$PATH"
ENV PATH="$POETRY_HOME/bin:$PATH"
RUN poetry config virtualenvs.create false

# Install python dependencies. Note that poetry installs any root packages by default, but this is not available at this
Expand Down
172 changes: 87 additions & 85 deletions docs/source/inter_service_compatibility.rst

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions octue/cloud/deployment/google/cloud_run/Dockerfile-python310
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM windpioneers/gdal-python:little-gecko-gdal-2.4.1-python-3.10-slim

# Ensure print statements and log messages appear promptly in Cloud Logging.
ENV PYTHONUNBUFFERED True
ENV PYTHONUNBUFFERED=True

ENV PROJECT_ROOT=/workspace
WORKDIR $PROJECT_ROOT
Expand All @@ -10,7 +10,7 @@ RUN apt-get update -y && apt-get install -y --fix-missing build-essential && rm

# Install poetry.
ENV POETRY_HOME=/root/.poetry
ENV PATH "$POETRY_HOME/bin:$PATH"
ENV PATH="$POETRY_HOME/bin:$PATH"
RUN curl -sSL https://install.python-poetry.org | python3 - && poetry config virtualenvs.create false;

# Copy in the dependencies file(s) for caching. One or more of `requirements.txt`, `setup.py`, and `pyproject.toml and
Expand Down
4 changes: 2 additions & 2 deletions octue/cloud/deployment/google/cloud_run/Dockerfile-python311
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM windpioneers/gdal-python:modest-heron-gdal-2.4.1-python-3.11-slim

# Ensure print statements and log messages appear promptly in Cloud Logging.
ENV PYTHONUNBUFFERED True
ENV PYTHONUNBUFFERED=True

ENV PROJECT_ROOT=/workspace
WORKDIR $PROJECT_ROOT
Expand All @@ -10,7 +10,7 @@ RUN apt-get update -y && apt-get install -y --fix-missing build-essential && rm

# Install poetry.
ENV POETRY_HOME=/root/.poetry
ENV PATH "$POETRY_HOME/bin:$PATH"
ENV PATH="$POETRY_HOME/bin:$PATH"
RUN curl -sSL https://install.python-poetry.org | python3 - && poetry config virtualenvs.create false;

# Copy in the dependencies file(s) for caching. One or more of `requirements.txt`, `setup.py`, and `pyproject.toml and
Expand Down
4 changes: 2 additions & 2 deletions octue/cloud/deployment/google/cloud_run/Dockerfile-python39
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM windpioneers/gdal-python:little-gecko-gdal-2.4.1-python-3.9-slim

# Ensure print statements and log messages appear promptly in Cloud Logging.
ENV PYTHONUNBUFFERED True
ENV PYTHONUNBUFFERED=True

ENV PROJECT_ROOT=/workspace
WORKDIR $PROJECT_ROOT
Expand All @@ -10,7 +10,7 @@ RUN apt-get update -y && apt-get install -y --fix-missing build-essential && rm

# Install poetry.
ENV POETRY_HOME=/root/.poetry
ENV PATH "$POETRY_HOME/bin:$PATH"
ENV PATH="$POETRY_HOME/bin:$PATH"
RUN curl -sSL https://install.python-poetry.org | python3 - && poetry config virtualenvs.create false;

# Copy in the dependencies file(s) for caching. One or more of `requirements.txt`, `setup.py`, and `pyproject.toml and
Expand Down
2 changes: 1 addition & 1 deletion octue/cloud/deployment/google/cloud_run/flask_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _check_if_should_drop_question(question_uuid, retry_count):
previous_question_attempts = get_events(
table_id=service_configuration.event_store_table_id,
question_uuid=question_uuid,
kind="delivery_acknowledgement",
exclude_kinds=["question"],
)

except google.api_core.exceptions.NotFound:
Expand Down
6 changes: 6 additions & 0 deletions octue/cloud/events/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class AbstractEventHandler:
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers must not mutate the events.
:param dict schema: the JSON schema to validate events against
:param bool include_service_metadata_in_logs: if `True`, include the SRUIDs and question UUIDs of the service revisions involved in the question to the start of the log message
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
:param bool only_handle_result: if `True`, skip handling non-result events and only handle the "result" event when received (turning this on speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (turning this off speeds up event handling)
:return None:
Expand All @@ -51,13 +52,15 @@ def __init__(
event_handlers=None,
schema=SERVICE_COMMUNICATION_SCHEMA,
include_service_metadata_in_logs=True,
exclude_logs_containing=None,
only_handle_result=False,
validate_events=True,
):
self.handle_monitor_message = handle_monitor_message
self.record_events = record_events
self.schema = schema
self.include_service_metadata_in_logs = include_service_metadata_in_logs
self.exclude_logs_containing = exclude_logs_containing
self.only_handle_result = only_handle_result
self.validate_events = validate_events

Expand Down Expand Up @@ -201,6 +204,9 @@ def _handle_log_message(self, event, attributes):
:param dict attributes: the event's attributes
:return None:
"""
if self.exclude_logs_containing and self.exclude_logs_containing in event["log_record"]["msg"]:
return

record = logging.makeLogRecord(event["log_record"])

# Split the log message into its parts.
Expand Down
10 changes: 7 additions & 3 deletions octue/cloud/events/replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@


class EventReplayer(AbstractEventHandler):
"""A replayer for events retrieved asynchronously from storage.
"""A replayer for events retrieved asynchronously from storage. Note that events aren't validated by default as the
main use case for this class is to replay already-validated events from the event store.

:param callable|None handle_monitor_message: a function to handle monitor messages (e.g. send them to an endpoint for plotting or displaying) - this function should take a single JSON-compatible python primitive
:param bool record_events: if `True`, record received events in the `received_events` attribute
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers must not mutate the events.
:param dict|str schema: the JSON schema to validate events against
:param bool include_service_metadata_in_logs: if `True`, include the SRUIDs and question UUIDs of the service revisions involved in the question to the start of the log message
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
:param bool only_handle_result: if `True`, skip non-result events and only handle the "result" event if present (turning this on speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (turning this off speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (this is off by default to speed up event handling)
:return None:
"""

Expand All @@ -27,8 +29,9 @@ def __init__(
event_handlers=None,
schema=SERVICE_COMMUNICATION_SCHEMA,
include_service_metadata_in_logs=True,
exclude_logs_containing=None,
only_handle_result=False,
validate_events=True,
validate_events=False,
):
event_handlers = event_handlers or {
"question": self._handle_question,
Expand All @@ -46,6 +49,7 @@ def __init__(
event_handlers=event_handlers,
schema=schema,
include_service_metadata_in_logs=include_service_metadata_in_logs,
exclude_logs_containing=exclude_logs_containing,
only_handle_result=only_handle_result,
validate_events=validate_events,
)
Expand Down
50 changes: 39 additions & 11 deletions octue/cloud/pub_sub/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def get_events(
question_uuid=None,
parent_question_uuid=None,
originator_question_uuid=None,
kind=None,
kinds=None,
exclude_kinds=None,
include_backend_metadata=False,
tail=True,
limit=1000,
Expand All @@ -47,13 +48,14 @@ def get_events(
:param str|None question_uuid: the UUID of a question to get events for
:param str|None parent_question_uuid: the UUID of a parent question to get the sub-question events for
:param str|None originator_question_uuid: the UUID of an originator question get the full tree of events for
:param str|None kind: the kind of event to get; if `None`, all event kinds are returned
:param iter(str)|None kinds: the kinds of event to get; if `None`, all event kinds are returned
:param iter(str)|None exclude_kinds: the kind of events to exclude; if `None`, all event kinds are returned
:param bool include_backend_metadata: if `True`, include the service backend metadata
:param bool tail: if `True`, return the most recent events (where a limit applies); e.g. return the most recent 100 log records
:param int limit: the maximum number of events to return
:return list(dict): the events for the question; this will be empty if there are no events for the question
"""
_validate_inputs(question_uuid, parent_question_uuid, originator_question_uuid, kind)
_validate_inputs(question_uuid, parent_question_uuid, originator_question_uuid, kinds, exclude_kinds)

if question_uuid:
question_uuid_condition = "WHERE question_uuid=@relevant_question_uuid"
Expand All @@ -62,10 +64,17 @@ def get_events(
elif originator_question_uuid:
question_uuid_condition = "WHERE originator_question_uuid=@relevant_question_uuid"

if kind:
event_kind_condition = [f"AND kind={kind!r}"]
if kinds:
kinds_string = ", ".join([f"{kind!r}" for kind in kinds])
event_kinds_condition = [f"AND kind IN ({kinds_string})"]
else:
event_kind_condition = []
event_kinds_condition = []

if exclude_kinds:
exclude_kinds_string = ", ".join([f"{kind!r}" for kind in exclude_kinds])
exclude_kinds_condition = [f"AND kind NOT IN ({exclude_kinds_string})"]
else:
exclude_kinds_condition = []

# Make a shallow copy of the fields to query.
fields = list(DEFAULT_FIELDS)
Expand All @@ -77,7 +86,8 @@ def get_events(
[
f"SELECT {', '.join(fields)} FROM `{table_id}`",
question_uuid_condition,
*event_kind_condition,
*event_kinds_condition,
*exclude_kinds_condition,
]
)

Expand Down Expand Up @@ -116,14 +126,15 @@ def get_events(
return _unflatten_events(events)


def _validate_inputs(question_uuid, parent_question_uuid, originator_question_uuid, kind):
def _validate_inputs(question_uuid, parent_question_uuid, originator_question_uuid, kinds, exclude_kinds):
"""Check that only one of `question_uuid`, `parent_question_uuid`, or `originator_question_uuid` are provided and
that the `kind` parameter is a valid event kind.

:param str|None question_uuid: the UUID of a question to get events for
:param str|None parent_question_uuid: the UUID of a parent question to get the sub-question events for
:param str|None originator_question_uuid: the UUID of an originator question get the full tree of events for
:param str|None kind: the kind of event to get; if `None`, all event kinds are returned
:param iter(str)|None kinds: the kinds of event to get; if `None`, all event kinds are returned
:param iter(str)|None exclude_kinds: the kind of events to exclude; if `None`, all event kinds are returned
:raise ValueError: if more than one of `question_uuid`, `parent_question_uuid`, or `originator_question_uuid` are provided or the `kind` parameter is invalid
:return None:
"""
Expand All @@ -135,8 +146,25 @@ def _validate_inputs(question_uuid, parent_question_uuid, originator_question_uu
"provided."
)

if kind and kind not in VALID_EVENT_KINDS:
raise ValueError(f"`kind` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}.")
kinds_inputs = (bool(kinds), bool(exclude_kinds))

if sum(kinds_inputs) > 1:
raise ValueError(
f"Only one of `kinds` and `exclude_kinds` can be provided at once; received kinds={kinds!r} and "
f"exclude_kinds={exclude_kinds!r}."
)

if kinds:
for kind in kinds:
if kind not in VALID_EVENT_KINDS:
raise ValueError(f"All items in `kinds` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}.")

if exclude_kinds:
for kind in exclude_kinds:
if kind not in VALID_EVENT_KINDS:
raise ValueError(
f"All items in `exclude_kinds` must be one of {VALID_EVENT_KINDS!r}; received {kind!r}."
)


def _deserialise_event(event):
Expand Down
9 changes: 9 additions & 0 deletions octue/cloud/pub_sub/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class GoogleCloudPubSubEventHandler(AbstractEventHandler):
:param dict|None event_handlers: a mapping of event type names to callables that handle each type of event. The handlers must not mutate the events.
:param dict|str schema: the JSON schema to validate events against
:param bool include_service_metadata_in_logs: if `True`, include the SRUIDs and question UUIDs of the service revisions involved in the question to the start of the log message
:param str|None exclude_logs_containing: if provided, skip handling log messages containing this string
:param bool only_handle_result: if `True`, skip non-result events and only handle the "result" event if present (turning this on speeds up event handling)
:param bool validate_events: if `True`, validate events before attempting to handle them (turn this off to speed up event handling at risk of failure if an invalid event is received)
:return None:
"""

Expand All @@ -81,6 +84,9 @@ def __init__(
event_handlers=None,
schema=SERVICE_COMMUNICATION_SCHEMA,
include_service_metadata_in_logs=True,
exclude_logs_containing=None,
only_handle_result=False,
validate_events=True,
):
self.subscription = subscription

Expand All @@ -90,6 +96,9 @@ def __init__(
event_handlers=event_handlers,
schema=schema,
include_service_metadata_in_logs=include_service_metadata_in_logs,
exclude_logs_containing=exclude_logs_containing,
only_handle_result=only_handle_result,
validate_events=validate_events,
)

self._heartbeat_checker = None
Expand Down
Loading
Loading