Skip to content

Commit 97e92e0

Browse files
authored
Merge pull request #14 from alphagov/ACW-37/non-blocking
ACW-37 Implement tracking for running and background jobs with resilience
2 parents 519cd28 + ef4b87c commit 97e92e0

5 files changed

Lines changed: 130 additions & 37 deletions

File tree

app.py

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,14 @@
1111
from flask import Flask, request, jsonify, render_template
1212
from src.visualiser_graph_generator import generate_graph, generate_output_path
1313
from src.visualiser_graph_loader import load_json_file, extract_path_parts, visualiser_graph_file_path
14-
from src.utils import update_job_status, read_job_status
14+
from src.utils import (
15+
update_job_status,
16+
read_job_status,
17+
get_job_id_for_path,
18+
get_active_job_status,
19+
background_run_extraction,
20+
resume_interrupted_jobs
21+
)
1522
from werkzeug.exceptions import BadRequest
1623

1724

@@ -73,8 +80,18 @@ async def extract_quotes():
7380
return jsonify({"error": "Missing 'source_path' query parameter"}), 400
7481

7582
input_path, output_path = generate_output_path(source_path)
76-
job_id = str(uuid.uuid4())
83+
job_id = get_job_id_for_path(source_path)
7784

85+
active_status = get_active_job_status(job_id)
86+
if active_status:
87+
logger.info(f"Duplicate request for {source_path}. Job {job_id} is already in progress.")
88+
return jsonify({
89+
'job_id': job_id,
90+
'status': 'already_running',
91+
'message': f'A graph generation job is already in progress for {source_path}',
92+
'output_path': output_path
93+
}), 202
94+
7895
initial_status = {
7996
"job_id": job_id,
8097
"status": "pending",
@@ -83,26 +100,7 @@ async def extract_quotes():
83100
}
84101
update_job_status(job_id, initial_status)
85102

86-
async def run_extraction():
87-
try:
88-
logger.info(f'Starting background graph generation for {input_path} (Job: {job_id})...')
89-
initial_status["status"] = "running"
90-
update_job_status(job_id, initial_status)
91-
92-
await generate_graph(input_path, output_path)
93-
94-
initial_status["status"] = "completed"
95-
initial_status["output_path"] = output_path
96-
initial_status["completed_at"] = time.time()
97-
update_job_status(job_id, initial_status)
98-
logger.info(f'Graph generation completed successfully for {output_path}')
99-
except Exception as e:
100-
logger.error(f"Background graph generation failed for job {job_id}: {str(e)}")
101-
initial_status["status"] = "failed"
102-
initial_status["error"] = str(e)
103-
update_job_status(job_id, initial_status)
104-
105-
asyncio.create_task(run_extraction())
103+
asyncio.create_task(background_run_extraction(job_id, input_path, output_path, initial_status))
106104

107105
return jsonify({
108106
'job_id': job_id,
@@ -129,8 +127,29 @@ def handle_bad_request(e):
129127

130128
return app
131129

130+
class LifespanMiddleware:
131+
"""ASGI middleware to handle startup and shutdown events."""
132+
def __init__(self, app):
133+
self.app = app
134+
135+
async def __call__(self, scope, receive, send):
136+
if scope["type"] == "lifespan":
137+
while True:
138+
message = await receive()
139+
if message["type"] == "lifespan.startup":
140+
# Trigger resumption when the event loop is officially running
141+
logger.info("ASGI startup: triggering job resumption scan...")
142+
asyncio.create_task(resume_interrupted_jobs())
143+
await send({"type": "lifespan.startup.complete"})
144+
elif message["type"] == "lifespan.shutdown":
145+
await send({"type": "lifespan.shutdown.complete"})
146+
return
147+
return await self.app(scope, receive, send)
148+
132149
def create_asgi_app():
133-
return WsgiToAsgi(create_app())
150+
flask_app = create_app()
151+
asgi_app = WsgiToAsgi(flask_app)
152+
return LifespanMiddleware(asgi_app)
134153

135154
if __name__ == "__main__":
136155
asgi_app = create_asgi_app()

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ dependencies = [
1414
"opensearch-py>=2.8.0",
1515
"requests-aws4auth>=1.3.1",
1616
"flask[async]==3.1.2",
17-
"waitress>=3.0.0",
1817
"uvicorn>=0.30.0",
1918
]
2019

src/utils/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,9 @@
1-
from .job_tracker import update_job_status, read_job_status, get_status_path
1+
from .job_tracker import (
2+
update_job_status,
3+
read_job_status,
4+
get_status_path,
5+
get_job_id_for_path,
6+
get_active_job_status,
7+
background_run_extraction,
8+
resume_interrupted_jobs
9+
)

src/utils/job_tracker.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import json
22
import logging
33
import fsspec
4+
import hashlib
5+
import time
6+
import asyncio
7+
from typing import Optional
8+
from src.visualiser_graph_generator import generate_graph, generate_output_path
49

510
logger = logging.getLogger(__name__)
611

@@ -21,7 +26,7 @@ def update_job_status(job_id: str, status_data: dict):
2126
except Exception as e:
2227
logger.error(f"Failed to update job status in S3 for job {job_id}: {str(e)}")
2328

24-
def read_job_status(job_id: str) -> dict:
29+
def read_job_status(job_id: str) -> dict| None:
2530
"""Reads the job status from S3."""
2631
path = get_status_path(job_id)
2732
fs = fsspec.filesystem("s3")
@@ -37,3 +42,76 @@ def read_job_status(job_id: str) -> dict:
3742
except Exception as e:
3843
logger.error(f"Failed to read job status from S3 for job {job_id}: {str(e)}")
3944
return None
45+
46+
def get_job_id_for_path(source_path: str) -> str:
47+
"""Generates a predictable job_id based on a hash of the source_path."""
48+
return hashlib.sha256(source_path.encode()).hexdigest()
49+
50+
def get_active_job_status(job_id: str, timeout_hours: int = 24) -> Optional[dict]:
51+
"""
52+
Returns the job status ONLY if it is currently active (pending/running)
53+
and has not exceeded the timeout. Returns None if the job is stale or not active.
54+
"""
55+
status = read_job_status(job_id)
56+
if not status:
57+
return None
58+
59+
if status.get("status") in ["pending", "running"]:
60+
created_at = status.get("created_at", 0)
61+
is_stale = (time.time() - created_at) > (timeout_hours * 3600)
62+
63+
if not is_stale:
64+
return status
65+
66+
return None
67+
68+
async def background_run_extraction(job_id: str, input_path: str, output_path: str, status: dict):
69+
"""Background task for graph generation and status tracking."""
70+
try:
71+
logger.info(f'Starting background graph generation for {input_path} (Job: {job_id})...')
72+
status["status"] = "running"
73+
update_job_status(job_id, status)
74+
75+
await generate_graph(input_path, output_path)
76+
77+
status["status"] = "completed"
78+
status["output_path"] = output_path
79+
status["completed_at"] = time.time()
80+
update_job_status(job_id, status)
81+
logger.info(f'Graph generation completed successfully for {output_path}')
82+
except Exception as e:
83+
logger.error(f"Background graph generation failed for job {job_id}: {str(e)}")
84+
status["status"] = "failed"
85+
status["error"] = str(e)
86+
update_job_status(job_id, status)
87+
88+
async def resume_interrupted_jobs():
89+
"""Scans for jobs stuck in 'running' state and restarts them if they are fresh (<24h)."""
90+
logger.info("Scanning for interrupted jobs to resume...")
91+
fs = fsspec.filesystem("s3")
92+
93+
try:
94+
# List all status files
95+
status_files = fs.glob(f"{STATUS_ROOT}/*.json")
96+
97+
for file_path in status_files:
98+
job_id = file_path.split("/")[-1].replace(".json", "")
99+
status = read_job_status(job_id)
100+
101+
if status and status.get("status") == "running":
102+
created_at = status.get("created_at", 0)
103+
is_fresh = (time.time() - created_at) < (24 * 3600)
104+
105+
if is_fresh:
106+
source_path = status.get("source_path")
107+
if source_path:
108+
try:
109+
input_path, output_path = generate_output_path(source_path)
110+
logger.info(f"Resuming interrupted job {job_id} for {source_path}")
111+
asyncio.create_task(background_run_extraction(job_id, input_path, output_path, status))
112+
except Exception as e:
113+
logger.error(f"Failed to prepare resumption for job {job_id}: {str(e)}")
114+
else:
115+
logger.info(f"Skipping stale interrupted job {job_id} (over 24h old)")
116+
except Exception as e:
117+
logger.error(f"Error during job resumption scan: {str(e)}")

uv.lock

Lines changed: 0 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)