Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ log_level = "INFO"
log_format = "%(asctime)s [%(levelname)s] %(name)s.%(funcName)s.%(lineno)d: %(message)s"

# Disable botocore 'datetime.datetime.utcnow()' deprecation warning
filterwarnings = "ignore:datetime.datetime.utcnow:DeprecationWarning:botocore"
filterwarnings = [
"error",
"ignore:datetime.datetime.utcnow:DeprecationWarning:botocore",
]

[tool.coverage.report]
omit =[
Expand Down
2 changes: 0 additions & 2 deletions src/components/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

from .runner import Runner

TASKS_FINISH_CHECK_TIME = 1

_logger = logging.getLogger("executor")

last_message_at: datetime
Expand Down
13 changes: 6 additions & 7 deletions src/components/executor/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@

async def _change_visibility_loop(message: message_queue.Message) -> None:
"""Change the message visibility while it's been processed"""
try:
while app.running():
await message_queue.change_visibility(message)
await app.sleep(message_queue.get_queue_wait_message_time())
except asyncio.CancelledError:
return
while app.running():
await message_queue.change_visibility(message)
await app.sleep(message_queue.get_queue_wait_message_time())


class Runner:
Expand Down Expand Up @@ -81,6 +78,9 @@ async def process_message(
_change_visibility_loop(self.message), parent_task=asyncio.current_task()
)

# Give control back to the event loop to allow the change visibility task to start
await asyncio.sleep(0)

# Protect execution from exceptions
try:
# Handle the message accordingly
Expand All @@ -100,7 +100,6 @@ async def process_message(

# Stop the message change visibility loop
change_visibility_task.cancel()
await change_visibility_task

async def process(self, semaphore: asyncio.Semaphore) -> None:
"""Get a message and process it"""
Expand Down
25 changes: 13 additions & 12 deletions src/components/task_manager/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from typing import Any, Coroutine

import utils.app as app
from utils.exception_handling import protected_task

TASKS_FINISH_CHECK_TIME = 1
LOOP_TIME = 60

_logger = logging.getLogger("task_manager")

Expand All @@ -24,7 +26,7 @@ def create_task(
) -> asyncio.Task[Any]:
"""Create a task that will be executed in the background with an optional 'parent' attribute. If
the parent task is done while the child task is running, the child task will be canceled"""
task = asyncio.create_task(coro, name=coro.__name__)
task = asyncio.create_task(protected_task(_logger, coro), name=coro.__name__)
_tasks.setdefault(parent_task, []).append(task)

if parent_task is not None:
Expand All @@ -36,10 +38,15 @@ def create_task(
def _clear_completed() -> None:
"""Remove completed tasks from the global task list"""
global _tasks
_tasks = {
parent: [task for task in tasks if not task.done()] for parent, tasks in _tasks.items()
}
_tasks = {parent: tasks for parent, tasks in _tasks.items() if len(tasks) > 0}

cleaned_tasks = {}

for parent, tasks in _tasks.items():
active_tasks = [task for task in tasks if not task.done()]
if len(active_tasks) > 0:
cleaned_tasks[parent] = active_tasks

_tasks = cleaned_tasks


async def wait_for_tasks(
Expand All @@ -65,12 +72,6 @@ async def wait_for_tasks(
return True


async def wait_for_all_tasks(timeout: float | None = None, cancel: bool = False) -> None:
"""Wait for all running tasks to finish"""
for parent_task in _tasks.keys():
await wait_for_tasks(parent_task=parent_task, timeout=timeout, cancel=cancel)


def _count_running(tasks: dict[Any, list[asyncio.Task[Any]]]) -> int:
"""Count the number of running tasks"""
running_tasks = 0
Expand All @@ -94,7 +95,7 @@ async def run() -> None:

while app.running():
_clear_completed()
await app.sleep(60)
await app.sleep(LOOP_TIME)

_logger.info("Finishing")
await _wait_to_finish(_tasks)
3 changes: 0 additions & 3 deletions tests/components/executor/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ async def test_run(monkeypatch, clear_queue):
"""Integration test of the 'run' method. It should wait for messages and process them using the
runners"""
monkeypatch.setattr(configs, "executor_concurrency", 5)
monkeypatch.setattr(executor, "TASKS_FINISH_CHECK_TIME", 0.01)

registry.monitors_ready.set()

Expand Down Expand Up @@ -111,7 +110,6 @@ async def test_run_current_task_error(caplog, monkeypatch):

async def test_run_no_messages(mocker, monkeypatch, clear_queue):
"""'run' should sleep for the configured time when there are no messages in the queue"""
monkeypatch.setattr(executor, "TASKS_FINISH_CHECK_TIME", 0.01)
monkeypatch.setattr(
message_queue.queue._config, # type: ignore[attr-defined]
"queue_wait_message_time",
Expand Down Expand Up @@ -150,7 +148,6 @@ async def test_run_error(caplog, monkeypatch, clear_queue):
"queue_wait_message_time",
0.2,
)
monkeypatch.setattr(executor, "TASKS_FINISH_CHECK_TIME", 0.01)

registry.monitors_ready.set()

Expand Down
6 changes: 3 additions & 3 deletions tests/components/executor/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def test_runner_process_message_success(caplog, mocker, monkeypatch):
monkeypatch.setattr(
message_queue.queue._config, # type: ignore[attr-defined]
"queue_wait_message_time",
0.1,
0.2,
)
change_visibility_spy: MagicMock = mocker.spy(message_queue, "change_visibility")
delete_message_spy: MagicMock = mocker.spy(message_queue, "delete_message")
Expand Down Expand Up @@ -104,7 +104,7 @@ async def test_runner_process_message_sentinela_error(caplog, mocker, monkeypatc
monkeypatch.setattr(
message_queue.queue._config, # type: ignore[attr-defined]
"queue_wait_message_time",
0.1,
0.2,
)
change_visibility_spy: MagicMock = mocker.spy(message_queue, "change_visibility")
delete_message_spy: MagicMock = mocker.spy(message_queue, "delete_message")
Expand Down Expand Up @@ -143,7 +143,7 @@ async def test_runner_process_message_error(caplog, mocker, monkeypatch):
monkeypatch.setattr(
message_queue.queue._config, # type: ignore[attr-defined]
"queue_wait_message_time",
0.1,
0.2,
)
change_visibility_spy: MagicMock = mocker.spy(message_queue, "change_visibility")
delete_message_spy: MagicMock = mocker.spy(message_queue, "delete_message")
Expand Down
Loading