Skip to content

Log warning when runtime timeout is near #690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 101 additions & 99 deletions docs/source/inter_service_compatibility.rst

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions octue/cloud/pub_sub/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from octue.cloud.events.handler import AbstractEventHandler
from octue.cloud.events.validation import SERVICE_COMMUNICATION_SCHEMA
from octue.utils.decoders import OctueJSONDecoder
from octue.utils.objects import getattr_or_subscribe
from octue.utils.objects import get_nested_attribute
from octue.utils.threads import RepeatingTimer


Expand All @@ -28,7 +28,7 @@ def extract_event_and_attributes_from_pub_sub_message(message):
:return (any, dict): the extracted event and its attributes
"""
# Cast attributes to a dictionary to avoid defaultdict-like behaviour from Pub/Sub message attributes container.
attributes = dict(getattr_or_subscribe(message, "attributes"))
attributes = dict(get_nested_attribute(message, "attributes"))

# Deserialise the `parent_question_uuid`, `forward_logs`, and `retry_count`, fields if they're present
# (don't assume they are before validation).
Expand Down
22 changes: 18 additions & 4 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import importlib.metadata
import json
import logging
import os
import time
import uuid

import google.api_core.exceptions
Expand Down Expand Up @@ -232,11 +234,12 @@ def answer(self, question, heartbeat_interval=120, timeout=30):

try:
self._send_delivery_acknowledgment(**routing_metadata)
start_time = time.perf_counter()

heartbeater = RepeatingTimer(
interval=heartbeat_interval,
function=self._send_heartbeat,
kwargs=routing_metadata,
function=self._send_heartbeat_and_check_runtime,
kwargs={"start_time": start_time, **routing_metadata},
)

heartbeater.daemon = True
Expand Down Expand Up @@ -666,24 +669,29 @@ def _send_delivery_acknowledgment(

logger.info("%r acknowledged receipt of question %r.", self, question_uuid)

def _send_heartbeat(
def _send_heartbeat_and_check_runtime(
self,
question_uuid,
parent_question_uuid,
originator_question_uuid,
parent,
originator,
retry_count,
start_time,
runtime_timeout_warning_time=3480, # This is 58 minutes in seconds.
timeout=30,
):
"""Send a heartbeat to the parent, indicating that the service is alive.
"""Send a heartbeat to the parent, indicating that the service is alive. If it's running on Cloud Run and it's
been running for longer than the runtime timeout warning time, log a warning that it will be stopped soon.

:param str question_uuid: the UUID of the question this event relates to
:param str|None parent_question_uuid: the UUID of the question that triggered this question
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
:param str parent: the SRUID of the parent that asked the question this event is related to
:param str originator: the SRUID of the service revision that triggered all ancestor questions of this question
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param int|float start_time: the `time.perf_counter` time that the analysis was started [s]
:param int|float runtime_timeout_warning_time: the amount of time after which to warn that the runtime timeout is approaching [s]
:param float timeout: time in seconds after which to give up sending
:return None:
"""
Expand All @@ -700,6 +708,12 @@ def _send_heartbeat(
timeout=timeout,
)

if (
os.environ.get("COMPUTE_PROVIDER") == "GOOGLE_CLOUD_RUN"
and time.perf_counter() - start_time > runtime_timeout_warning_time
):
logger.warning("This analysis will reach the maximum runtime and be stopped soon.")

logger.debug("Heartbeat sent by %r.", self)

def _send_monitor_message(
Expand Down
Loading
Loading