Skip to content

Commit 87fa410

Browse files
Merge pull request #358 from phenobarbital/dev
fix callback and complete execution on Background Service
2 parents 96a22a7 + 7ec012e commit 87fa410

6 files changed

Lines changed: 79 additions & 30 deletions

File tree

navigator/background/service/__init__.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,15 @@ async def record(self, task_id: uuid.UUID) -> Optional[JobRecord]:
9494
"""
9595
if not task_id:
9696
return None
97+
if isinstance(task_id, uuid.UUID):
98+
task_id = str(task_id.hex)
9799
if isinstance(task_id, str):
98-
task_id = uuid.UUID(task_id)
99-
if not isinstance(task_id, uuid.UUID):
100-
raise ValueError("task_id must be a UUID or a string representation of a UUID")
101-
if task_id not in self.tracker._jobs:
100+
task_id = uuid.UUID(task_id).hex
101+
if not isinstance(task_id, str):
102+
raise ValueError(
103+
"task_id must be a UUID, a hex-string, or None"
104+
)
105+
if not await self.tracker.exists(task_id):
102106
return None
107+
103108
return await self.tracker.status(task_id)

navigator/background/tracker/memory.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class JobTracker:
1111
Replace by a DB or Redis backend later.
1212
"""
1313
def __init__(self) -> None:
14-
self._jobs: Dict[uuid.UUID, JobRecord] = {}
14+
self._jobs: Dict[str, JobRecord] = {}
1515
self._lock = asyncio.Lock()
1616

1717
# -----------------------------------------------------------
@@ -28,30 +28,34 @@ async def create_job(self, **kwargs) -> JobRecord:
2828
self._jobs[record.task_id] = record
2929
return record
3030

31-
async def set_running(self, job_id: uuid.UUID) -> None:
31+
async def set_running(self, job_id: str) -> None:
3232
async with self._lock:
3333
rec = self._jobs[job_id]
3434
rec.status = "running"
3535
rec.started_at = time_now()
3636

37-
async def set_done(self, job_id: uuid.UUID, result: Any = None) -> None:
37+
async def set_done(self, job_id: str, result: Any = None) -> None:
3838
async with self._lock:
3939
rec = self._jobs[job_id]
4040
rec.status = "done"
4141
rec.finished_at = time_now()
4242
rec.result = result
4343

44-
async def set_failed(self, job_id: uuid.UUID, exc: Exception) -> None:
44+
async def set_failed(self, job_id: str, exc: Exception) -> None:
4545
async with self._lock:
4646
rec = self._jobs[job_id]
4747
rec.status = "failed"
4848
rec.finished_at = time_now()
4949
rec.error = f"{type(exc).__name__}: {exc}"
5050

51-
async def status(self, job_id: uuid.UUID) -> Optional[JobRecord]:
51+
async def status(self, job_id: str) -> Optional[JobRecord]:
5252
async with self._lock:
5353
return self._jobs.get(job_id)
5454

55-
async def list_jobs(self) -> Dict[uuid.UUID, JobRecord]:
55+
async def list_jobs(self) -> Dict[str, JobRecord]:
5656
async with self._lock:
5757
return dict(self._jobs)
58+
59+
async def exists(self, job_id: str) -> bool:
60+
async with self._lock:
61+
return job_id in self._jobs

navigator/background/tracker/models.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
from datetime import datetime
55
from datamodel import BaseModel, Field
66

7+
def gen_uuid() -> str:
8+
"""Generate a new UUID."""
9+
return str(uuid.uuid4().hex)
710

811
def time_now() -> int:
912
"""Get the current time in milliseconds."""
@@ -14,7 +17,7 @@ class JobRecord(BaseModel):
1417
1518
Job Record for Background Task Execution.
1619
"""
17-
task_id: str = Field(default=str(uuid.uuid4().hex))
20+
task_id: str = Field(default=gen_uuid)
1821
name: str = None
1922
status: str = 'pending'
2023
attributes: Dict[str, Any] = Field(default_factory=dict)

navigator/background/tracker/redis.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ async def create_job(self, **kwargs) -> JobRecord:
4444
await self._redis.sadd(self._set_key, record.task_id)
4545
return record
4646

47+
async def exists(self, job_id: str) -> bool:
48+
return await self._redis.exists(self._key(job_id)) == 1
49+
4750
async def _update(self, job_id: str, **patch) -> None:
4851
"""
4952
Update a job record with the given patch.

navigator/background/wrappers/__init__.py

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,41 @@
99

1010

1111
coroutine = Callable[[int], Coroutine[Any, Any, str]]
12+
OnCompleteFn = Callable[[Any, Optional[Exception]], Awaitable[None]]
1213

1314

14-
def coroutine_in_thread(coro: coroutine, callback: Optional[coroutine] = None):
15+
def coroutine_in_thread(
16+
coro: coroutine,
17+
callback: Optional[coroutine] = None,
18+
on_complete: OnCompleteFn = None,
19+
) -> threading.Event:
1520
"""Run a coroutine in a new thread with its own event loop."""
21+
parent_loop = asyncio.get_running_loop()
1622
done_event = threading.Event()
1723

18-
def run():
24+
def _runner():
1925
new_loop = asyncio.new_event_loop()
2026
asyncio.set_event_loop(new_loop)
21-
result = new_loop.run_until_complete(coro)
22-
# if callback exists:
23-
if callback:
24-
new_loop.run_until_complete(callback(result))
25-
new_loop.close()
26-
done_event.set() # Signal that the coroutine has completed
27-
thread = threading.Thread(target=run, daemon=True)
28-
thread.start()
27+
result, exc = None, None
28+
try:
29+
result = new_loop.run_until_complete(coro)
30+
except Exception as e: # noqa: BLE001
31+
exc = e
32+
finally:
33+
if callback:
34+
new_loop.run_until_complete(
35+
callback(result, exc, loop=new_loop)
36+
)
37+
new_loop.close()
38+
done_event.set() # Signal that the coroutine has completed
39+
if on_complete is None:
40+
return
41+
fut = asyncio.run_coroutine_threadsafe(
42+
on_complete(result, exc), parent_loop
43+
)
44+
fut.result() # Wait for the completion of the callback
2945

46+
threading.Thread(target=_runner, daemon=True).start()
3047
return done_event
3148

3249

@@ -115,16 +132,33 @@ async def __call__(self):
115132
# Delay the execution by jitter seconds
116133
await asyncio.sleep(delay)
117134
try:
135+
async def _finish(result: Any, exc: Exception):
136+
"""Callback to handle the completion of the coroutine."""
137+
if exc:
138+
self.logger.error(
139+
f"TaskWrapper {self._name} failed with exception: {exc}"
140+
)
141+
result = {
142+
"status": "failed",
143+
"error": str(exc)
144+
}
145+
if self.tracker:
146+
await self.tracker.set_failed(self.task_uuid, exc)
147+
else:
148+
self.logger.debug(
149+
f"TaskWrapper {self._name} completed successfully."
150+
)
151+
result = {
152+
"status": "done",
153+
"result": result
154+
}
155+
if self.tracker:
156+
await self.tracker.set_done(self.task_uuid, result)
157+
return result
118158
with ThreadPoolExecutor(max_workers=1) as executor:
119159
coro = self.fn(*self.args, **self.kwargs)
120-
coroutine_in_thread(coro, self._callback_)
121-
result = {
122-
"status": "done"
123-
}
124-
if self.tracker:
125-
# Set the job as done in the tracker
126-
await self.tracker.set_done(self.task_uuid, result)
127-
return result
160+
coroutine_in_thread(coro, self._callback_, on_complete=_finish)
161+
return {"status": "running"}
128162
except asyncio.CancelledError:
129163
self.logger.warning(
130164
f"TaskWrapper {self.fn.__name__} was cancelled."

navigator/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
__description__ = (
55
"Navigator Web Framework based on aiohttp, " "with batteries included."
66
)
7-
__version__ = "2.12.29"
7+
__version__ = "2.12.30"
88
__copyright__ = "Copyright (c) 2020-2024 Jesus Lara"
99
__author__ = "Jesus Lara"
1010
__author_email__ = "jesuslarag@gmail.com"

0 commit comments

Comments
 (0)