Skip to content

Commit 7f9ab3c

Browse files
Merge pull request #362 from phenobarbital/dev
changes on BackgrounQueue
2 parents 9953b28 + a87da40 commit 7f9ab3c

4 files changed

Lines changed: 59 additions & 13 deletions

File tree

navigator/background/queue/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,13 @@ async def _execute_taskwrapper(self, task: TaskWrapper):
181181
with ThreadPoolExecutor(max_workers=1) as executor:
182182
try:
183183
result = await task()
184+
except asyncio.CancelledError:
185+
self.logger.warning(
186+
f"TaskWrapper {task!r} was cancelled."
187+
)
188+
result = {
189+
"status": "cancelled"
190+
}
184191
except Exception as e:
185192
self.logger.exception(
186193
f"Error executing TaskWrapper {task!r}: {e}",
@@ -190,6 +197,16 @@ async def _execute_taskwrapper(self, task: TaskWrapper):
190197
"status": "failed",
191198
"error": e
192199
}
200+
# Handle the exception and return a failure result
201+
# This could be customized based on your needs
202+
try:
203+
if hasattr(task, "tracker"):
204+
# set status = "failed"
205+
await task.tracker.set_failed(task.task_uuid)
206+
except Exception as e:
207+
self.logger.error(
208+
f"Error updating tracker for TaskWrapper {task!r}: {e}"
209+
)
193210
return result
194211

195212
async def _execute_coroutine(self, coro: coroutine):
@@ -305,6 +322,7 @@ async def process_queue(self):
305322
print('TASK LOG ERROR > ', e)
306323
# Call your task completion callback (if any)
307324
try:
325+
print('RESULT > ', result)
308326
await self._callback(task, result=result)
309327
except Exception as e:
310328
self.logger.error(

navigator/background/tracker/redis.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Any, Callable, Dict, Optional, Sequence, List, Mapping
33
import asyncio
44
import redis.asyncio as redis
5+
from datamodel.exceptions import ParserError
56
from ...libs.json import json_encoder, json_decoder # pylint: disable=E0611 # noqa
67
from .models import JobRecord, time_now
78
from ...conf import CACHE_URL
@@ -84,10 +85,15 @@ async def _update(self, job_id: str, **patch) -> None:
8485
rec: JobRecord = self._decoder(payload)
8586
for k, v in patch.items():
8687
setattr(rec, k, v)
87-
await self._redis.set(
88-
key, self._encoder(rec), keepttl=True
89-
) # keep the TTL
90-
88+
try:
89+
await self._redis.set(
90+
key, self._encoder(rec), keepttl=True
91+
) # keep the TTL
92+
except ParserError as exc:
93+
# we cannot encode the result of JobRecord, so we raise an error
94+
raise ValueError(
95+
f"Invalid job record data: {exc}, payload: {exc.payload}"
96+
) from exc
9197
# Update secondary index for attributes if they are part of the patch
9298
if 'attributes' in patch:
9399
# Remove old attributes from the index

navigator/background/wrappers/__init__.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def __init__(
6666
self.fn = fn
6767
self.tracker = tracker
6868
self._name: str = kwargs.pop('name', fn.__name__ if fn else 'unknown_task')
69-
self._callback_: Union[Callable, Awaitable] = kwargs.pop('callback', None)
69+
self._user_callback: Union[Callable, Awaitable] = kwargs.pop('callback', None)
7070
job_status = kwargs.pop('status', 'pending')
7171
if job_status not in ['pending', 'running', 'done', 'failed']:
7272
raise ValueError(
@@ -76,12 +76,11 @@ def __init__(
7676
self.jitter: float = jitter
7777
# Create the Job Record at status "pending"
7878
# generate a list of arguments accepted by JobRecord:
79-
job_args = {}
79+
job_args = {
80+
k: v for k, v in kwargs.items()
81+
if not k.startswith('_') and k in JobRecord.__fields__
82+
}
8083
content = kwargs.pop('content', None)
81-
for k, v in kwargs.items():
82-
if not k.startswith('_'):
83-
if k in JobRecord.__fields__:
84-
job_args[k] = v
8584
self.job_record: JobRecord = JobRecord(
8685
name=self._name,
8786
content=content,
@@ -113,7 +112,28 @@ def add_callback(self, callback: Union[Callable, Awaitable]):
113112
- callback (Union[Callable, Awaitable]):
114113
Callback function to be called after the task is executed.
115114
"""
116-
self._callback_ = callback
115+
self._user_callback = callback
116+
117+
async def _wrapped_callback(self, result, exc, loop):
118+
"""
119+
Internal wrapper callback that injects JobRecord information
120+
before calling the user's callback.
121+
122+
Args:
123+
- result: The result of the task execution.
124+
- exc: Exception raised during task execution, if any.
125+
- loop: The event loop in which the task was executed.
126+
"""
127+
if self._user_callback:
128+
# Call user callback with additional JobRecord info
129+
# New signature: callback(result, exc, loop, job_record, task_id)
130+
await self._user_callback(
131+
result,
132+
exc,
133+
loop,
134+
job_record=self.job_record,
135+
task_id=self.job_record.task_id
136+
)
117137

118138
async def __call__(self):
119139
result = None
@@ -166,7 +186,9 @@ async def _finish(result: Any, exc: Exception):
166186
return result
167187
with ThreadPoolExecutor(max_workers=1) as executor:
168188
coro = self.fn(*self.args, **self.kwargs)
169-
coroutine_in_thread(coro, self._callback_, on_complete=_finish)
189+
# Use the wrapped callback instead of the user callback directly
190+
callback_to_use = self._wrapped_callback if self._user_callback else None
191+
coroutine_in_thread(coro, callback_to_use, on_complete=_finish)
170192
return {"status": "running"}
171193
except asyncio.CancelledError:
172194
self.logger.warning(

navigator/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
__description__ = (
55
"Navigator Web Framework based on aiohttp, " "with batteries included."
66
)
7-
__version__ = "2.13.2"
7+
__version__ = "2.13.3"
88
__copyright__ = "Copyright (c) 2020-2024 Jesus Lara"
99
__author__ = "Jesus Lara"
1010
__author_email__ = "jesuslarag@gmail.com"

0 commit comments

Comments
 (0)