Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1168b45
Added ORM for Jobs and adjusted logging, reordered columns
bakspace-itk Oct 28, 2025
8180032
Functionally complete
bakspace-itk Oct 29, 2025
a6c9f54
Added tests and changed passed objects to full Job, instead of ID. Fi…
bakspace-itk Oct 29, 2025
be96906
Formatted alembic revision, expanded scheduler tests and fixed JobSta…
bakspace-itk Oct 30, 2025
cf1231b
first stab from claude on testing job ui
bakspace-itk Oct 30, 2025
9850375
Working tests and removed warnings from browser
bakspace-itk Oct 30, 2025
399bee6
Linting and changelog
bakspace-itk Oct 30, 2025
48c4019
Fixed job ID being set as string instead of UUID
bakspace-itk Oct 30, 2025
80f9e9b
Fixed errors with UUID being set as string
bakspace-itk Oct 30, 2025
15713ef
Fixed alembic test and added full drop to db_test_util
bakspace-itk Oct 30, 2025
8b2f00c
Changed line order by which an imported module (sqlchemy) was causing…
bakspace-itk Oct 30, 2025
5f36e18
Fix attempt of a null thing with sql stuff
bakspace-itk Oct 30, 2025
d303fdd
fixed tests, reordered tabs a bit, removed redundant code, made a col…
bakspace-itk Dec 5, 2025
503a295
changed constant to uppercase
bakspace-itk Dec 5, 2025
72363f3
fixed comment starting with "#" and not "# "
bakspace-itk Dec 5, 2025
ee0cc92
changed "constants" back to not-constants. And added version numbers …
bakspace-itk Dec 5, 2025
e5502e6
Removed string conversion to avoid hex error on github actions
bakspace-itk Dec 5, 2025
962278c
Merged develop into branch
bakspace-itk Dec 10, 2025
17d73c4
Added 1 second sleep to test browser opening, to make tests more reli…
bakspace-itk Dec 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 115 additions & 4 deletions OpenOrchestrator/database/db_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from OpenOrchestrator.common import crypto_util
from OpenOrchestrator.database.logs import Log, LogLevel
from OpenOrchestrator.database.jobs import Job, JobStatus
from OpenOrchestrator.database.constants import Constant, Credential
from OpenOrchestrator.database.triggers import Trigger, SingleTrigger, ScheduledTrigger, QueueTrigger, TriggerStatus
from OpenOrchestrator.database.queues import QueueElement, QueueStatus
Expand Down Expand Up @@ -62,7 +63,7 @@ def check_database_revision() -> bool:
except alc_exc.ProgrammingError:
return False

return version == "9698388a0709"
return version == "1c87ed320c78"


def _get_session() -> Session:
Expand Down Expand Up @@ -201,7 +202,7 @@ def delete_trigger(trigger_id: UUID | str) -> None:

def get_logs(offset: int, limit: int,
from_date: datetime | None = None, to_date: datetime | None = None,
process_name: str | None = None, log_level: LogLevel | None = None) -> tuple[Log, ...]:
process_name: str | None = None, log_level: LogLevel | None = None, job_id: str | UUID | None = None) -> tuple[Log, ...]:
"""Get the logs from the database using filters and pagination.

Args:
Expand All @@ -211,10 +212,14 @@ def get_logs(offset: int, limit: int,
to_date: The datetime where the log time must be at or earlier. If none the filter is disabled.
process_name: The process name to filter on. If none the filter is disabled.
log_level: The log level to filter on. If none the filter is disabled.
job_id: The job ID to filter on. If none the filter is disabled.

Returns:
A list of logs matching the given filters.
"""
if isinstance(job_id, str):
job_id = UUID(job_id)

query = (
select(Log)
.order_by(desc(Log.log_time))
Expand All @@ -234,36 +239,142 @@ def get_logs(offset: int, limit: int,
if log_level:
query = query.where(Log.log_level == log_level)

if job_id:
query = query.where(Log.job_id == job_id)

with _get_session() as session:
result = session.scalars(query).all()
return tuple(result)


def create_log(process_name: str, level: LogLevel, message: str) -> None:
def create_log(process_name: str, level: LogLevel, job_id: str | UUID | None, message: str) -> None:
"""Create a log in the logs table in the database.

Args:
process_name: The name of the process generating the log.
level: The level of the log.
message: The message of the log.
"""
if isinstance(job_id, str):
job_id = UUID(job_id)

with _get_session() as session:
log = Log(
log_level = level,
process_name = process_name,
job_id = job_id,
log_message = truncate_message(message)
)
session.add(log)
session.commit()


def get_jobs(status: JobStatus | None = None, process_name: str | None = None) -> tuple[Job]:
"""Get jobs matching the requested status or process name.

Args:
status: Status of jobs, RUNNING, DONE, FAILED or KILLED. Defaults to None.
process_name: Process name matching the jobs. Defaults to None.

Returns:
Tuple containing jobs matching the filters.
"""
query = (
select(Job)
.order_by(desc(Job.start_time))
)

if status:
query = query.where(Job.status == status)

if process_name:
query = query.where(Job.process_name == process_name)

with _get_session() as session:
result = session.scalars(query).all()
return tuple(result)


def start_job(process_name: str, scheduler_name: str) -> Job:
"""Create a new job, using count of previous jobs and process name as ID.

Args:
process_name: Process name starting this job.

Returns:
Job id of newly created job.
"""
with _get_session() as session:
job = Job(
process_name=process_name,
scheduler_name=scheduler_name,
status=JobStatus.RUNNING
)
session.add(job)
session.commit()
session.refresh(job)
session.expunge(job)
return job


def set_job_status(job_id: UUID | str, status: JobStatus):
"""Set status of job and update end_time, based on status.

Args:
job_id: Job ID to set status on.
status: Status to set. Either RUNNING, DONE, FAILED or KILLED.
Will set end_time to null if RUNNING or current time if not.

Raises:
ValueError: If no job is found, will raise error.
"""
if isinstance(job_id, str):
job_id = UUID(job_id)

with _get_session() as session:
job = session.get(Job, job_id)

if not job:
raise ValueError("No job with the given id was found.")

job.status = status
if status == JobStatus.RUNNING:
job.end_time = None
else:
job.end_time = datetime.now()
session.commit()


def get_job(job_id: UUID | str) -> Job:
"""Get a job by ID.

Args:
job_id: The ID of the job to get.

Returns:
The Job object.

Raises:
ValueError: If no job is found.
"""
if isinstance(job_id, str):
job_id = UUID(job_id)
with _get_session() as session:
job = session.get(Job, job_id)

if not job:
raise ValueError("No job with the given id was found.")

session.expunge(job)
return job


def get_unique_log_process_names() -> tuple[str, ...]:
"""Get a list of unique process names in the logs database.

Returns:
A list of unique process names.
"""

query = (
select(Log.process_name)
.distinct()
Expand Down
46 changes: 46 additions & 0 deletions OpenOrchestrator/database/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""This module defines ORM classes for jobs"""

from datetime import datetime
import enum
import uuid

from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column

from OpenOrchestrator.common import datetime_util
from OpenOrchestrator.database.base import Base

# All classes in this module are effectively dataclasses without methods.
# pylint: disable=too-few-public-methods


class JobStatus(enum.Enum):
"""An enum representing the level of logs."""
RUNNING = "Running"
DONE = "Done"
FAILED = "Failed"
KILLED = "Killed"


class Job(Base):
"""A class representing job objects in the ORM."""

__tablename__ = "Jobs"

id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
process_name: Mapped[str] = mapped_column(String(100))
scheduler_name: Mapped[str] = mapped_column(String(100))
status: Mapped[JobStatus] = mapped_column(default=JobStatus.RUNNING)
start_time: Mapped[datetime] = mapped_column(default=datetime.now)
end_time: Mapped[datetime] = mapped_column(default=None, nullable=True)

def to_row_dict(self) -> dict[str, str]:
"""Convert log to a row dictionary for display in a table."""
return {
"ID": str(self.id),
"Process Name": self.process_name,
"Scheduler": self.scheduler_name,
"Start Time": datetime_util.format_datetime(self.start_time),
"End Time": datetime_util.format_datetime(self.end_time),
"Status": self.status.value
}
3 changes: 3 additions & 0 deletions OpenOrchestrator/database/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Log(Base):
log_time: Mapped[datetime] = mapped_column(default=datetime.now)
log_level: Mapped[LogLevel]
process_name: Mapped[str] = mapped_column(String(100))
job_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True)
log_message: Mapped[str] = mapped_column(String(8000))

def to_row_dict(self) -> dict[str, str]:
Expand All @@ -38,5 +39,7 @@ def to_row_dict(self) -> dict[str, str]:
"Level": self.log_level.value,
"Process Name": self.process_name,
"Message": self.log_message,
"Short Job ID": str(self.job_id)[:8],
"Full Job ID": self.job_id,
"ID": str(self.id)
}
30 changes: 23 additions & 7 deletions OpenOrchestrator/orchestrator/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from OpenOrchestrator.orchestrator.tabs.trigger_tab import TriggerTab
from OpenOrchestrator.orchestrator.tabs.settings_tab import SettingsTab
from OpenOrchestrator.orchestrator.tabs.logging_tab import LoggingTab
from OpenOrchestrator.orchestrator.tabs.jobs_tab import JobsTab
from OpenOrchestrator.orchestrator.tabs.constants_tab import ConstantTab
from OpenOrchestrator.orchestrator.tabs.queue_tab import QueueTab
from OpenOrchestrator.orchestrator.tabs.schedulers_tab import SchedulerTab


# pylint: disable=too-many-instance-attributes
class Application():
"""The main application of Orchestrator.
It contains a header and the four tabs of the application.
Expand All @@ -21,10 +23,11 @@ def __init__(self, port: int | None = None, show: bool = True) -> None:
with ui.header():
with ui.tabs() as self.tabs:
ui.tab('Triggers').props("auto-id=trigger_tab")
ui.tab('Queues').props("auto-id=queues_tab")
ui.tab('Schedulers').props("auto-id=schedulers_tab")
ui.tab('Jobs').props("auto-id=jobs_tab")
ui.tab('Logs').props("auto-id=logs_tab")
ui.tab('Constants').props("auto-id=constants_tab")
ui.tab('Schedulers').props("auto-id=schedulers_tab")
ui.tab('Queues').props("auto-id=queues_tab")
ui.tab('Settings').props("auto-id=settings_tab")

ui.space()
Expand All @@ -33,10 +36,11 @@ def __init__(self, port: int | None = None, show: bool = True) -> None:

with ui.tab_panels(self.tabs, value='Settings', on_change=self.update_tab).classes('w-full') as self.tab_panels:
self.t_tab = TriggerTab('Triggers')
self.j_tab = JobsTab("Jobs", self.on_job_click)
self.l_tab = LoggingTab("Logs")
self.q_tab = QueueTab("Queues")
self.c_tab = ConstantTab("Constants")
self.s_tab = SchedulerTab("Schedulers")
self.q_tab = QueueTab("Queues")
SettingsTab('Settings')

self._define_on_close()
Expand All @@ -50,14 +54,16 @@ def update_tab(self):
match self.tab_panels.value:
case 'Triggers':
self.t_tab.update()
case 'Queues':
self.q_tab.update()
case 'Jobs':
self.j_tab.update()
case 'Logs':
self.l_tab.update()
case 'Constants':
self.c_tab.update()
case 'Schedulers':
self.s_tab.update()
case 'Queues':
self.q_tab.update()
case 'Constants':
self.c_tab.update()

def update_loop(self):
"""Update the selected tab on a timer."""
Expand All @@ -71,6 +77,16 @@ def _define_on_close(self) -> None:
</script>
''')

def on_job_click(self, job_id: str):
"""Used by JobTab to update and switch to logging tab.

Args:
job_id: Job selected when switching to Logging tab.
"""
self.l_tab.current_job_id = job_id
self.l_tab.update()
self.tabs.set_value('Logs')


def get_free_port():
"""Get a free port by creating a new socket and bind it
Expand Down
58 changes: 58 additions & 0 deletions OpenOrchestrator/orchestrator/tabs/jobs_tab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""This module is responsible for the layout and functionality of the Schedulers tab
in Orchestrator."""
from typing import Callable

from nicegui import ui

from OpenOrchestrator.database import db_util
from OpenOrchestrator.orchestrator import test_helper

COLUMNS = [
{'name': "job_id", 'label': "ID", 'field': "ID", 'headerClasses': 'hidden', 'classes': 'hidden'},
{'name': "process_name", 'label': "Process Name", 'field': "Process Name", 'align': 'left', 'sortable': True},
{'name': "scheduler_name", 'label': "Scheduler", 'field': "Scheduler", 'align': 'left', 'sortable': True},
{'name': "start_time", 'label': "Start Time", 'field': "Start Time", 'align': 'left', 'sortable': True},
{'name': "end_time", 'label': "End Time", 'field': "End Time", 'align': 'left', 'sortable': True},
{'name': "status", 'label': "Status", 'field': "Status", 'align': 'left', 'sortable': True}
]


# pylint: disable-next=too-few-public-methods
class JobsTab:
"""A class for the jobs tab."""
def __init__(self, tab_name: str, on_job_click: Callable[[str], None]) -> None:
with ui.tab_panel(tab_name):
self.jobs_table = ui.table(title="Jobs", columns=COLUMNS, rows=[], row_key='job_id', pagination=50).classes("w-full")
self.jobs_table.on("rowClick", self._row_click)
self._add_column_colors()
test_helper.set_automation_ids(self, "jobs_tab")
self.on_job_click = on_job_click

def update(self):
"""Updates the tables on the tab."""
jobs = db_util.get_jobs()
self.jobs_table.rows = [s.to_row_dict() for s in jobs]

def _row_click(self, event):
row = event.args[1]
job_id = row["ID"]
self.on_job_click(job_id)

def _add_column_colors(self):
"""Add custom coloring to the jobs table."""
# Add coloring to the status column
color_dict = "{Running: 'blue', Done: 'green', Failed: 'red', Killed: 'grey-9'}"

self.jobs_table.add_slot(
"body-cell-status",
f'''
<q-td key="status" :props="props">
<q-badge v-if="{color_dict}[props.value]" :color="{color_dict}[props.value]">
{{{{props.value}}}}
</q-badge>
<p v-else>
{{{{props.value}}}}
</p>
</q-td>
'''
)
Loading