Skip to content

Commit 96a22a7

Browse files
Merge pull request #357 from phenobarbital/dev
redis support on background service
2 parents 364e4c1 + a78b1df commit 96a22a7

8 files changed

Lines changed: 299 additions & 93 deletions

File tree

navigator/background/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# -*- coding: utf-8 -*-
2-
from .tracker import JobTracker, JobRecord
2+
from .tracker import JobTracker, RedisJobTracker, JobRecord
33
from .wrappers import TaskWrapper
44
from .service import BackgroundService
55
from .queue import BackgroundQueue, BackgroundTask, SERVICE_NAME

navigator/background/queue/__init__.py

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
from typing import (
23
Union,
34
Optional,
@@ -38,8 +39,8 @@ class BackgroundQueue:
3839
3940
TODO:
4041
- Add Task Timeout
41-
- Add Task Retry
42-
- Added Wrapper Support
42+
- Add Task Retry (done)
43+
- Added Wrapper Support (done)
4344
"""
4445
service_name: str = SERVICE_NAME
4546

@@ -219,7 +220,6 @@ async def _execute_callable(self, func: Callable, *args, **kwargs):
219220
result = None
220221
try:
221222
loop = asyncio.get_running_loop()
222-
# with ThreadPoolExecutor(max_workers=1) as executor:
223223
result = await loop.run_in_executor(
224224
self.executor,
225225
func,
@@ -281,11 +281,8 @@ async def process_queue(self):
281281
f"Invalid Function {func} in Queue"
282282
)
283283
continue
284-
except Exception as e: # Catch all exceptions
285-
print('ERROR > ', e)
286-
self.logger.error(
287-
f"Error executing task {func.__name__}: {e}"
288-
)
284+
except Exception as exc: # Catch all exceptions
285+
await self._handle_failure(task, exc)
289286
continue
290287
finally:
291288
if self._enable_profiling is True:
@@ -305,7 +302,7 @@ async def process_queue(self):
305302
Peak Memory Usage: {peak_memory / (1024 ** 2):.2f} MB
306303
""")
307304
except Exception as e:
308-
print('LOG ERROR > ', e)
305+
print('TASK LOG ERROR > ', e)
309306
# Call your task completion callback (if any)
310307
try:
311308
await self._callback(task, result=result)
@@ -330,6 +327,41 @@ async def fire_consumers(self):
330327
)
331328
self.consumers.append(task)
332329

330+
async def _requeue(self, task: TaskWrapper, exc: Exception) -> None:
331+
"""Internal: re-enqueues `task` after updating retry-counters."""
332+
task.retries_done += 1
333+
# optional exponential back-off
334+
if task.retry_delay:
335+
await asyncio.sleep(
336+
random.uniform(0.8, 1.2) * task.retry_delay * task.retries_done
337+
)
338+
339+
self.logger.warning(
340+
f"Retry {task.retries_done}/{task.max_retries} for {task!r} "
341+
f"after error: {exc}"
342+
)
343+
if hasattr(task, "tracker"):
344+
# set status = "retrying"
345+
await task.tracker.set_running(task.task_uuid)
346+
await self.queue.put(task)
347+
348+
async def _handle_failure(
349+
self,
350+
task: Any,
351+
exc: Exception
352+
) -> None:
353+
"""Central place that decides whether we retry or finally give up."""
354+
if (
355+
isinstance(task, TaskWrapper) and task.retries_done < task.max_retries
356+
):
357+
await self._requeue(task, exc)
358+
else:
359+
self.logger.error(
360+
f"Task {task!r} failed permanently after "
361+
f"{getattr(task, 'retries_done', 0)} attempt(s)."
362+
)
363+
await self._callback(task, result=dict(status="failed", error=exc))
364+
333365

334366
class BackgroundTask:
335367
"""BackgroundTask.

navigator/background/service/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import uuid
33
from aiohttp import web
44
from ..queue import BackgroundQueue
5-
from ..tracker import JobTracker, JobRecord
5+
from ..tracker import JobTracker, RedisJobTracker, JobRecord
66
from ..wrappers import TaskWrapper
7+
from ...conf import CACHE_URL
78

89

910
class BackgroundService:
@@ -16,11 +17,20 @@ def __init__(
1617
app: web.Application,
1718
queue: Optional[BackgroundQueue] = None,
1819
tracker: Optional[JobTracker] = None,
20+
tracker_type: str = 'memory',
1921
**kwargs
2022
) -> None:
2123
self.queue = queue or BackgroundQueue(app, **kwargs)
2224
# Create a new JobTracker if not provided
23-
self.tracker = tracker or JobTracker()
25+
self.tracker = tracker
26+
if not tracker:
27+
if tracker_type == 'redis':
28+
self.tracker = RedisJobTracker(
29+
url=kwargs.get('redis_url', CACHE_URL),
30+
prefix=kwargs.get('tracker_prefix', 'job:')
31+
)
32+
else:
33+
self.tracker = JobTracker()
2434
# Register the queue and tracker in the application
2535
app['background_service'] = self
2636
app['service_tracker'] = self.tracker
Lines changed: 9 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,11 @@
1-
from typing import Dict, Any, Optional
2-
import asyncio
3-
import uuid
4-
import time
5-
from datetime import datetime
6-
from datamodel import BaseModel, Field
1+
from .models import JobRecord, time_now
2+
from .memory import JobTracker
3+
from .redis import RedisJobTracker
74

85

9-
def time_now() -> int:
10-
"""Get the current time in milliseconds."""
11-
return int(time.time() * 1000)
12-
13-
class JobRecord(BaseModel):
14-
"""JobRecord.
15-
16-
Job Record for Background Task Execution.
17-
"""
18-
task_id: str = Field(default=uuid.uuid4().hex)
19-
name: str = None
20-
status: str = 'pending'
21-
attributes: Dict[str, Any] = Field(default_factory=dict)
22-
result: Optional[Any] = None
23-
error: Optional[str] = None
24-
created_at: datetime = Field(default=datetime.now())
25-
started_at: Optional[int]
26-
finished_at: Optional[int]
27-
result: Any = None
28-
error: Optional[str] = None
29-
stacktrace: Optional[str] = None
30-
31-
class Meta:
32-
strict = True
33-
34-
def __repr__(self):
35-
return f"<JobRecord {self.name} ({self.id})>"
36-
37-
38-
class JobTracker:
39-
"""
40-
A very small, coroutine-safe in-memory job store.
41-
Replace by a DB or Redis backend later.
42-
"""
43-
def __init__(self) -> None:
44-
self._jobs: Dict[uuid.UUID, JobRecord] = {}
45-
self._lock = asyncio.Lock()
46-
47-
# -----------------------------------------------------------
48-
# Public helpers
49-
# -----------------------------------------------------------
50-
async def create_job(self, **kwargs) -> JobRecord:
51-
record = JobRecord(**kwargs)
52-
async with self._lock:
53-
self._jobs[record.task_id] = record
54-
return record
55-
56-
async def set_running(self, job_id: uuid.UUID) -> None:
57-
async with self._lock:
58-
rec = self._jobs[job_id]
59-
rec.status = "running"
60-
rec.started_at = time_now()
61-
62-
async def set_done(self, job_id: uuid.UUID, result: Any = None) -> None:
63-
async with self._lock:
64-
rec = self._jobs[job_id]
65-
rec.status = "done"
66-
rec.finished_at = time_now()
67-
rec.result = result
68-
69-
async def set_failed(self, job_id: uuid.UUID, exc: Exception) -> None:
70-
async with self._lock:
71-
rec = self._jobs[job_id]
72-
rec.status = "failed"
73-
rec.finished_at = time_now()
74-
rec.error = f"{type(exc).__name__}: {exc}"
75-
76-
async def status(self, job_id: uuid.UUID) -> Optional[JobRecord]:
77-
async with self._lock:
78-
return self._jobs.get(job_id)
79-
80-
async def list_jobs(self) -> Dict[uuid.UUID, JobRecord]:
81-
async with self._lock:
82-
return dict(self._jobs)
6+
__all__ = (
7+
'JobTracker',
8+
'RedisJobTracker',
9+
'JobRecord',
10+
'time_now',
11+
)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from typing import Dict, Any, Optional
2+
import asyncio
3+
import uuid
4+
from datamodel.exceptions import ValidationError
5+
from .models import JobRecord, time_now
6+
7+
8+
class JobTracker:
9+
"""
10+
A very small, coroutine-safe in-memory job store.
11+
Replace by a DB or Redis backend later.
12+
"""
13+
def __init__(self) -> None:
14+
self._jobs: Dict[uuid.UUID, JobRecord] = {}
15+
self._lock = asyncio.Lock()
16+
17+
# -----------------------------------------------------------
18+
# Public helpers
19+
# -----------------------------------------------------------
20+
async def create_job(self, **kwargs) -> JobRecord:
21+
try:
22+
record = JobRecord(**kwargs)
23+
except ValidationError as exc:
24+
raise ValueError(
25+
f"Invalid job record data: {exc}, payload: {exc.payload}"
26+
) from exc
27+
async with self._lock:
28+
self._jobs[record.task_id] = record
29+
return record
30+
31+
async def set_running(self, job_id: uuid.UUID) -> None:
32+
async with self._lock:
33+
rec = self._jobs[job_id]
34+
rec.status = "running"
35+
rec.started_at = time_now()
36+
37+
async def set_done(self, job_id: uuid.UUID, result: Any = None) -> None:
38+
async with self._lock:
39+
rec = self._jobs[job_id]
40+
rec.status = "done"
41+
rec.finished_at = time_now()
42+
rec.result = result
43+
44+
async def set_failed(self, job_id: uuid.UUID, exc: Exception) -> None:
45+
async with self._lock:
46+
rec = self._jobs[job_id]
47+
rec.status = "failed"
48+
rec.finished_at = time_now()
49+
rec.error = f"{type(exc).__name__}: {exc}"
50+
51+
async def status(self, job_id: uuid.UUID) -> Optional[JobRecord]:
52+
async with self._lock:
53+
return self._jobs.get(job_id)
54+
55+
async def list_jobs(self) -> Dict[uuid.UUID, JobRecord]:
56+
async with self._lock:
57+
return dict(self._jobs)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from typing import Dict, Any, Optional
2+
import uuid
3+
import time
4+
from datetime import datetime
5+
from datamodel import BaseModel, Field
6+
7+
8+
def time_now() -> int:
9+
"""Get the current time in milliseconds."""
10+
return int(time.time() * 1000)
11+
12+
class JobRecord(BaseModel):
13+
"""JobRecord.
14+
15+
Job Record for Background Task Execution.
16+
"""
17+
task_id: str = Field(default=str(uuid.uuid4().hex))
18+
name: str = None
19+
status: str = 'pending'
20+
attributes: Dict[str, Any] = Field(default_factory=dict)
21+
result: Optional[Any] = None
22+
error: Optional[str] = None
23+
created_at: datetime = Field(default=datetime.now())
24+
started_at: Optional[int]
25+
finished_at: Optional[int]
26+
result: Any = None
27+
error: Optional[str] = None
28+
stacktrace: Optional[str] = None
29+
30+
class Meta:
31+
strict = True
32+
33+
def __repr__(self):
34+
return f"<JobRecord {self.name} ({self.id})>"

0 commit comments

Comments
 (0)