Skip to content

Commit 24c6d4e

Browse files
committed
Add status logging and endpoint to access status
1 parent 8d2d506 commit 24c6d4e

File tree

9 files changed

+279
-2
lines changed

9 files changed

+279
-2
lines changed

gefapi/celery.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ def make_celery(app):
2222
broker=app.config["broker_url"],
2323
)
2424
celery.conf.update(app.config)
25+
26+
# Configure periodic tasks
27+
celery.conf.beat_schedule = {
28+
"collect-system-status": {
29+
"task": "gefapi.tasks.status_monitoring.collect_system_status",
30+
"schedule": 120.0, # Every 2 minutes (120 seconds)
31+
},
32+
}
33+
celery.conf.timezone = "UTC"
34+
2535
TaskBase = celery.Task
2636

2737
class ContextTask(TaskBase):

gefapi/models/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def process_result_value(self, value, dialect):
5454
from gefapi.models.execution_log import ExecutionLog # noqa: E402
5555
from gefapi.models.script import Script # noqa: E402
5656
from gefapi.models.script_log import ScriptLog # noqa: E402
57+
from gefapi.models.status_log import StatusLog # noqa: E402
5758
from gefapi.models.user import User # noqa: E402
5859

59-
__all__ = ["Execution", "ExecutionLog", "Script", "ScriptLog", "User"]
60+
__all__ = ["Execution", "ExecutionLog", "Script", "ScriptLog", "StatusLog", "User"]

gefapi/models/status_log.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""STATUS LOG MODEL"""
2+
3+
from __future__ import absolute_import, division, print_function
4+
5+
import datetime
6+
7+
from gefapi import db
8+
from gefapi.models import GUID
9+
10+
db.GUID = GUID
11+
12+
13+
class StatusLog(db.Model):
14+
"""StatusLog Model"""
15+
16+
__tablename__ = "status_log"
17+
id = db.Column(db.Integer(), primary_key=True)
18+
timestamp = db.Column(db.DateTime(), default=datetime.datetime.utcnow)
19+
20+
# Execution counts
21+
executions_active = db.Column(db.Integer(), default=0)
22+
executions_ready = db.Column(db.Integer(), default=0)
23+
executions_running = db.Column(db.Integer(), default=0)
24+
executions_finished = db.Column(db.Integer(), default=0)
25+
26+
# Other counts
27+
users_count = db.Column(db.Integer(), default=0)
28+
scripts_count = db.Column(db.Integer(), default=0)
29+
30+
# System metrics
31+
memory_available_percent = db.Column(db.Float(), default=0.0)
32+
cpu_usage_percent = db.Column(db.Float(), default=0.0)
33+
34+
def __init__(
35+
self,
36+
executions_active=0,
37+
executions_ready=0,
38+
executions_running=0,
39+
executions_finished=0,
40+
users_count=0,
41+
scripts_count=0,
42+
memory_available_percent=0.0,
43+
cpu_usage_percent=0.0,
44+
):
45+
self.executions_active = executions_active
46+
self.executions_ready = executions_ready
47+
self.executions_running = executions_running
48+
self.executions_finished = executions_finished
49+
self.users_count = users_count
50+
self.scripts_count = scripts_count
51+
self.memory_available_percent = memory_available_percent
52+
self.cpu_usage_percent = cpu_usage_percent
53+
54+
def __repr__(self):
55+
return "<StatusLog %r>" % self.id
56+
57+
def serialize(self):
58+
"""Return object data in easily serializeable format"""
59+
return {
60+
"id": self.id,
61+
"timestamp": self.timestamp.isoformat(),
62+
"executions_active": self.executions_active,
63+
"executions_ready": self.executions_ready,
64+
"executions_running": self.executions_running,
65+
"executions_finished": self.executions_finished,
66+
"users_count": self.users_count,
67+
"scripts_count": self.scripts_count,
68+
"memory_available_percent": self.memory_available_percent,
69+
"cpu_usage_percent": self.cpu_usage_percent,
70+
}

gefapi/routes/api/v1/gef_api_router.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
)
2222
from gefapi.routes.api.v1 import endpoints, error
2323
from gefapi.s3 import get_script_from_s3
24-
from gefapi.services import ExecutionService, ScriptService, UserService
24+
from gefapi.services import ExecutionService, ScriptService, StatusService, UserService
2525
from gefapi.validators import (
2626
validate_execution_log_creation,
2727
validate_execution_update,
@@ -602,3 +602,58 @@ def delete_user(user):
602602
logger.error("[ROUTER]: " + str(e))
603603
return error(status=500, detail="Generic Error")
604604
return jsonify(data=user.serialize()), 200
605+
606+
607+
@endpoints.route("/status", strict_slashes=False, methods=["GET"])
608+
@jwt_required()
609+
def get_status_logs():
610+
"""Get system status logs (Admin only)"""
611+
logger.info("[ROUTER]: Getting status logs")
612+
613+
# Check if user is admin
614+
identity = current_user
615+
if identity.role != "ADMIN" and identity.email != "gef@gef.com":
616+
return error(status=403, detail="Forbidden")
617+
618+
# Parse date filters
619+
start_date = request.args.get("start_date", None)
620+
if start_date:
621+
start_date = dateutil.parser.parse(start_date)
622+
623+
end_date = request.args.get("end_date", None)
624+
if end_date:
625+
end_date = dateutil.parser.parse(end_date)
626+
627+
# Parse sorting
628+
sort = request.args.get("sort", None)
629+
630+
# Parse pagination
631+
try:
632+
page = int(request.args.get("page", 1))
633+
per_page = int(request.args.get("per_page", 100))
634+
page = max(page, 1)
635+
per_page = min(max(per_page, 1), 1000)
636+
except Exception:
637+
page, per_page = 1, 100
638+
639+
try:
640+
status_logs, total = StatusService.get_status_logs(
641+
start_date=start_date,
642+
end_date=end_date,
643+
sort=sort,
644+
page=page,
645+
per_page=per_page,
646+
)
647+
except Exception as e:
648+
logger.error("[ROUTER]: " + str(e))
649+
return error(status=500, detail="Generic Error")
650+
651+
return (
652+
jsonify(
653+
data=[status_log.serialize() for status_log in status_logs],
654+
page=page,
655+
per_page=per_page,
656+
total=total,
657+
),
658+
200,
659+
)

gefapi/services/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def handle_exception(exc_type, exc_value, exc_traceback):
2424
)
2525
from gefapi.services.email_service import EmailService # noqa:E402
2626
from gefapi.services.script_service import ScriptService # noqa:E402
27+
from gefapi.services.status_service import StatusService # noqa:E402
2728
from gefapi.services.user_service import UserService # noqa:E402
2829

2930
# Import last to avoid circular dependency
@@ -35,6 +36,7 @@ def handle_exception(exc_type, exc_value, exc_traceback):
3536
"docker_run",
3637
"EmailService",
3738
"ScriptService",
39+
"StatusService",
3840
"UserService",
3941
"ExecutionService",
4042
]

gefapi/services/status_service.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
"""STATUS SERVICE"""
2+
3+
from __future__ import absolute_import, division, print_function
4+
5+
import logging
6+
7+
import rollbar
8+
9+
from gefapi import db
10+
from gefapi.models import StatusLog
11+
12+
logger = logging.getLogger()
13+
14+
15+
class StatusService(object):
16+
"""Status Service Class"""
17+
18+
@staticmethod
19+
def get_status_logs(
20+
start_date=None,
21+
end_date=None,
22+
sort=None,
23+
page=1,
24+
per_page=100,
25+
):
26+
"""Get status logs with optional filtering and pagination"""
27+
logger.info("[SERVICE]: Getting status logs")
28+
logger.info("[DB]: QUERY")
29+
30+
if page < 1:
31+
page = 1
32+
if per_page < 1:
33+
per_page = 100
34+
35+
query = db.session.query(StatusLog)
36+
37+
# Apply date filters
38+
if start_date:
39+
query = query.filter(StatusLog.timestamp >= start_date)
40+
if end_date:
41+
query = query.filter(StatusLog.timestamp <= end_date)
42+
43+
# Apply sorting
44+
if sort:
45+
sort_field = sort[1:] if sort.startswith("-") else sort
46+
sort_direction = "desc" if sort.startswith("-") else "asc"
47+
if hasattr(StatusLog, sort_field):
48+
query = query.order_by(
49+
getattr(getattr(StatusLog, sort_field), sort_direction)()
50+
)
51+
else:
52+
# Default to sorting by timestamp descending
53+
query = query.order_by(StatusLog.timestamp.desc())
54+
55+
total = query.count()
56+
status_logs = query.offset((page - 1) * per_page).limit(per_page).all()
57+
58+
return status_logs, total

gefapi/tasks/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"""TASKS MODULE"""
2+
3+
from __future__ import absolute_import, division, print_function

gefapi/tasks/status_monitoring.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
"""STATUS MONITORING TASKS"""
2+
3+
from __future__ import absolute_import, division, print_function
4+
5+
import logging
6+
7+
import psutil
8+
import rollbar
9+
from celery import Task
10+
from sqlalchemy import func
11+
12+
from gefapi import celery, db
13+
from gefapi.models import Execution, Script, StatusLog, User
14+
15+
logger = logging.getLogger()
16+
17+
18+
class StatusMonitoringTask(Task):
19+
"""Base task for status monitoring"""
20+
21+
def on_failure(self, exc, task_id, args, kwargs, einfo):
22+
logger.error(f"Status monitoring task failed: {exc}")
23+
rollbar.report_exc_info()
24+
25+
26+
@celery.task(base=StatusMonitoringTask, bind=True)
27+
def collect_system_status(self):
28+
"""Collect system status and save to status_log table"""
29+
logger.info("[TASK]: Collecting system status")
30+
31+
try:
32+
# Count executions by status
33+
execution_counts = (
34+
db.session.query(Execution.status, func.count(Execution.id))
35+
.group_by(Execution.status)
36+
.all()
37+
)
38+
39+
execution_status_map = dict(execution_counts)
40+
executions_active = execution_status_map.get("ACTIVE", 0)
41+
executions_ready = execution_status_map.get("READY", 0)
42+
executions_running = execution_status_map.get("RUNNING", 0)
43+
executions_finished = execution_status_map.get("FINISHED", 0)
44+
45+
# Count users and scripts
46+
users_count = db.session.query(func.count(User.id)).scalar()
47+
scripts_count = db.session.query(func.count(Script.id)).scalar()
48+
49+
# Get system metrics
50+
memory = psutil.virtual_memory()
51+
memory_available_percent = memory.available / memory.total * 100
52+
cpu_usage_percent = psutil.cpu_percent(interval=1)
53+
54+
# Create status log entry
55+
status_log = StatusLog(
56+
executions_active=executions_active,
57+
executions_ready=executions_ready,
58+
executions_running=executions_running,
59+
executions_finished=executions_finished,
60+
users_count=users_count,
61+
scripts_count=scripts_count,
62+
memory_available_percent=memory_available_percent,
63+
cpu_usage_percent=cpu_usage_percent,
64+
)
65+
66+
logger.info("[DB]: ADD")
67+
db.session.add(status_log)
68+
db.session.commit()
69+
70+
logger.info(f"[TASK]: Status log created with ID {status_log.id}")
71+
return status_log.serialize()
72+
73+
except Exception as error:
74+
logger.error(f"[TASK]: Error collecting system status: {str(error)}")
75+
rollbar.report_exc_info()
76+
db.session.rollback()
77+
raise error

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ redis==5.0.8
1717
sparkpost==1.3.10
1818
rollbar==1.1.2
1919
blinker==1.8.2
20+
psutil>=5.8.0

0 commit comments

Comments
 (0)