Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,31 @@ CREATE TABLE IF NOT EXISTS user_subscriptions (
started_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS background_jobs (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
attempts INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
last_error TEXT,
result TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
started_at TIMESTAMP,
completed_at TIMESTAMP,
next_retry_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_background_jobs_status ON background_jobs(status);

CREATE TABLE IF NOT EXISTS job_history (
id SERIAL PRIMARY KEY,
job_id INT NOT NULL REFERENCES background_jobs(id) ON DELETE CASCADE,
attempt INT NOT NULL,
status VARCHAR(20) NOT NULL,
error TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_job_history_job ON job_history(job_id, attempt);

CREATE TABLE IF NOT EXISTS audit_logs (
id SERIAL PRIMARY KEY,
user_id INT REFERENCES users(id) ON DELETE SET NULL,
Expand Down
36 changes: 36 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,42 @@ class UserSubscription(db.Model):
started_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class JobStatus(str, Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
RETRYING = "RETRYING"
DEAD = "DEAD"


class BackgroundJob(db.Model):
__tablename__ = "background_jobs"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False)
status = db.Column(db.String(20), default=JobStatus.PENDING.value, nullable=False)
attempts = db.Column(db.Integer, default=0, nullable=False)
max_retries = db.Column(db.Integer, default=3, nullable=False)
last_error = db.Column(db.Text, nullable=True)
result = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
started_at = db.Column(db.DateTime, nullable=True)
completed_at = db.Column(db.DateTime, nullable=True)
next_retry_at = db.Column(db.DateTime, nullable=True)


class JobHistory(db.Model):
__tablename__ = "job_history"
id = db.Column(db.Integer, primary_key=True)
job_id = db.Column(
db.Integer, db.ForeignKey("background_jobs.id"), nullable=False
)
attempt = db.Column(db.Integer, nullable=False)
status = db.Column(db.String(20), nullable=False)
error = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class AuditLog(db.Model):
__tablename__ = "audit_logs"
id = db.Column(db.Integer, primary_key=True)
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .jobs import bp as jobs_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(jobs_bp, url_prefix="/jobs")
111 changes: 111 additions & 0 deletions packages/backend/app/routes/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..extensions import db
from ..models import BackgroundJob, JobHistory, JobStatus
from ..services.job_retry import (
get_job_stats,
get_dead_letter_jobs,
get_job_history,
job_to_dict,
)
import logging

bp = Blueprint("jobs", __name__)
logger = logging.getLogger("finmind.jobs")

# Valid status values for filtering
_VALID_STATUSES = frozenset(s.value for s in JobStatus)


@bp.get("/stats")
@jwt_required()
def job_stats():
stats = get_job_stats()
return jsonify(stats)


@bp.get("")
@jwt_required()
def list_jobs():
status = request.args.get("status")
try:
page = max(1, int(request.args.get("page", "1")))
page_size = min(100, max(1, int(request.args.get("page_size", "20"))))
except ValueError:
return jsonify(error="invalid pagination"), 400

q = db.session.query(BackgroundJob)
if status:
normalised = status.upper().strip()
if normalised not in _VALID_STATUSES:
return jsonify(error=f"invalid status, must be one of: {sorted(_VALID_STATUSES)}"), 400
q = q.filter(BackgroundJob.status == normalised)
items = (
q.order_by(BackgroundJob.created_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
return jsonify([job_to_dict(j) for j in items])


@bp.get("/<int:job_id>")
@jwt_required()
def get_job(job_id: int):
job = db.session.get(BackgroundJob, job_id)
if not job:
return jsonify(error="not found"), 404
data = job_to_dict(job)
data["history"] = get_job_history(job_id)
return jsonify(data)


@bp.get("/dead-letter")
@jwt_required()
def dead_letter_queue():
try:
limit = min(100, max(1, int(request.args.get("limit", "50"))))
except ValueError:
return jsonify(error="invalid limit"), 400
jobs = get_dead_letter_jobs(limit=limit)
return jsonify(jobs)


@bp.post("/<int:job_id>/retry")
@jwt_required()
def manual_retry(job_id: int):
job = db.session.get(BackgroundJob, job_id)
if not job:
return jsonify(error="not found"), 404
if job.status not in (JobStatus.DEAD.value, JobStatus.FAILED.value):
return jsonify(error="job is not in a failed state"), 400

# Record the manual-retry event in history before resetting counters
history_entry = JobHistory(
job_id=job.id,
attempt=job.attempts,
status="MANUAL_RETRY",
error=None,
)
db.session.add(history_entry)

job.status = JobStatus.PENDING.value
job.attempts = 0
job.last_error = None
job.completed_at = None
job.next_retry_at = None
db.session.commit()
logger.info("Manual retry job id=%s", job.id)
return jsonify(job_to_dict(job))


@bp.delete("/<int:job_id>")
@jwt_required()
def delete_job(job_id: int):
job = db.session.get(BackgroundJob, job_id)
if not job:
return jsonify(error="not found"), 404
db.session.delete(job)
db.session.commit()
logger.info("Deleted job id=%s", job.id)
return jsonify(message="deleted")
Loading