diff --git a/README.md b/README.md index ee23896..43f4963 100644 --- a/README.md +++ b/README.md @@ -62,14 +62,7 @@ PostQ is a job queue system with database = Database(os.getenv('DATABASE_URL')) await database.connect() job = models.Job( - workflow={ - 'tasks': [ - { - 'name': 'a', - 'params': {'image': 'debian:buster-slim', 'command': 'ls -laFh'} - } - ] - } + tasks= {'a': {'image': 'debian:buster-slim', 'command': 'ls -laFh'}} ) record = await database.fetch_one( tables.Job.insert().returning(*tables.Job.columns), values=job.dict() @@ -77,7 +70,7 @@ PostQ is a job queue system with # Then, after a few seconds... - joblog = models.JobLog( + joblog = models.Job( **await database.fetch_one( tables.JobLog.select().where( tables.JobLog.columns.id==record['id'] @@ -85,7 +78,7 @@ PostQ is a job queue system with ) ) - print(joblog.workflow.tasks[0].results) + print(joblog.tasks[0].results) # total 4.0K # drwxr-xr-x 2 root root 64 Sep 11 04:11 ./ diff --git a/alembic/versions/20200905194820_b76f56db845a_job_log.py b/alembic/versions/20200912045539_8c4306d66631_job_log.py similarity index 84% rename from alembic/versions/20200905194820_b76f56db845a_job_log.py rename to alembic/versions/20200912045539_8c4306d66631_job_log.py index 5c62cbf..40a8495 100644 --- a/alembic/versions/20200905194820_b76f56db845a_job_log.py +++ b/alembic/versions/20200912045539_8c4306d66631_job_log.py @@ -1,8 +1,8 @@ -"""job_log +"""job log -Revision ID: b76f56db845a +Revision ID: 8c4306d66631 Revises: 7e8956a72cf2 -Create Date: 2020-09-05 19:48:20.702598+00:00 +Create Date: 2020-09-12 04:55:39.362899+00:00 """ import sqlalchemy as sa @@ -11,7 +11,7 @@ from alembic import op # revision identifiers, used by Alembic. -revision = 'b76f56db845a' +revision = '8c4306d66631' down_revision = '7e8956a72cf2' branch_labels = None depends_on = None @@ -28,9 +28,7 @@ def upgrade(): nullable=False, ), sa.Column('qname', sa.String(), nullable=False), - sa.Column( - 'retries', sa.SmallInteger(), server_default=sa.text('1'), nullable=True - ), + sa.Column('status', sa.String(), nullable=False), sa.Column( 'queued', sa.DateTime(timezone=True), @@ -43,9 +41,10 @@ def upgrade(): server_default=sa.text('current_timestamp'), nullable=False, ), - sa.Column('status', sa.String(), nullable=False), + sa.Column('initialized', sa.DateTime(timezone=True), nullable=True), + sa.Column('logged', sa.DateTime(timezone=True), nullable=True), sa.Column( - 'workflow', + 'tasks', postgresql.JSONB(astext_type=sa.Text()), server_default=sa.text("'{}'::jsonb"), nullable=True, @@ -65,14 +64,10 @@ def upgrade(): op.create_index( op.f('ix_postq_job_queued'), 'job', ['queued'], unique=False, schema='postq' ) - op.create_index( - op.f('ix_postq_job_retries'), 'job', ['retries'], unique=False, schema='postq' - ) op.create_table( 'job_log', sa.Column('id', postgresql.UUID(), nullable=False), sa.Column('qname', sa.String(), nullable=False), - sa.Column('retries', sa.SmallInteger(), nullable=True), sa.Column('queued', sa.DateTime(timezone=True), nullable=False), sa.Column('scheduled', sa.DateTime(timezone=True), nullable=False), sa.Column('initialized', sa.DateTime(timezone=True), nullable=True), @@ -84,7 +79,7 @@ def upgrade(): ), sa.Column('status', sa.String(), nullable=False), sa.Column( - 'workflow', + 'tasks', postgresql.JSONB(astext_type=sa.Text()), server_default=sa.text("'{}'::jsonb"), nullable=True, @@ -103,7 +98,6 @@ def upgrade(): def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_table('job_log', schema='postq') - op.drop_index(op.f('ix_postq_job_retries'), table_name='job', schema='postq') op.drop_index(op.f('ix_postq_job_queued'), table_name='job', schema='postq') op.drop_index(op.f('ix_postq_job_qname'), table_name='job', schema='postq') op.drop_table('job', schema='postq') diff --git a/postq/executors.py b/postq/executors.py index ee0c513..93fd52c 100644 --- a/postq/executors.py +++ b/postq/executors.py @@ -31,11 +31,11 @@ def executor( task.results = process.stdout.decode() # TODO: I'm not happy with forcing UTF-8 task.errors = process.stderr.decode() if process.returncode > 0: - task.status = Status.error.name + task.status = str(Status.error) elif task.errors: - task.status = Status.warning.name + task.status = str(Status.warning) else: - task.status = Status.success.name + task.status = str(Status.success) send_data(address, task.dict()) diff --git a/postq/models.py b/postq/models.py index f1e9f91..a6a2376 100644 --- a/postq/models.py +++ b/postq/models.py @@ -1,12 +1,12 @@ import json from datetime import datetime, timezone -from typing import Any, List +from typing import Any, Dict, List from uuid import UUID import networkx as nx from pydantic import BaseModel, Field, validator -from . import enums +from postq.enums import Status class Model(BaseModel): @@ -26,46 +26,107 @@ class Task(Model): * params = other parameters, such as executor-specific parameters """ + # fields name: str depends: List[str] = Field(default_factory=list) params: dict = Field(default_factory=dict) - status: str = Field(default=enums.Status.initialized.name) + status: str = Field(default=str(Status.initialized)) results: str = Field(default_factory=str) errors: str = Field(default_factory=str) -class Workflow(Model): +class Job(Model): """ - A workflow defines what tasks are included in a Job. The tasks define their own - dependencies. The resulting workflow must form a directed acyclic graph -- i.e., - there cannot be a dependency loop. - - * tasks - the tasks that are included in the workflow. key = name, value = Task + A single job in the Job queue and then in the log. + + * id - Unique identifier for this job. + * qname - Name of the queue that this job is will be in. Different postq workers + listen to different queues. + * status - the current status of the job, one of the values in postq.Status. + * queued - timestamp when the job was queued. + * scheduled - timestamp when the job is scheduled (= queued by default). + * initialed - timestamp when the job was initialized. + * logged - timestamp when the job was logged. + * tasks - define the job's workflow. key = name, value = Task. Each task defines its + own dependencies, command, and image. The resulting workflow must form a directed + acyclic graph -- i.e., there cannot be a dependency loop. + * depends - the list of names of the other tasks that this task depends on. + * command - the command that the task runs in the image container. + * image - {docker, kubernetes} the image that is used to run the task. * graph - (generated from tasks) the DAG (directed acyclic graph) of tasks (a - networkx.DiGraph), which is validated as acyclic + networkx.DiGraph), which is validated as acyclic. + * data - extra data that is needed by the job or its tasks to complete the workflow. """ - tasks: List[Task] = Field(default_factory=list) + # fields + id: UUID = Field(default=None) + qname: str = Field(default='') + status: str = Field(default=str(Status.queued)) + queued: datetime = Field(default=None) + scheduled: datetime = Field(default=None) + initialized: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc)) + logged: datetime = Field(default=None) + tasks: Dict[str, Task] = Field(default_factory=dict) graph: Any = Field(default_factory=nx.DiGraph) + data: dict = Field(default_factory=dict) def dict(self, *args, **kwargs): - # don't include graph in output, because it's not serializable, and it's built from tasks + # don't include graph in output, because it's not serializable, and it's + # generated automatically from tasks. + data = { + key: val + for key, val in super().dict(*args, **kwargs).items() + if key not in ['graph'] + } + # filter 'name' out of tasks + data['tasks'] = { + name: {key: val for key, val in task.items() if key not in ['name']} + for name, task in data['tasks'].items() + } + return data + + # field converters + + @validator('status') + def validate_job_status(cls, value, values, **kwargs): + if value not in Status.__members__.keys(): + raise ValueError(f'value must be one of {list(Status.__members__.keys())}') + return value + + @validator('tasks', pre=True) + def convert_job_tasks(cls, value, values, **kwargs): + # convert a string to a dict + if isinstance(value, str): + value = json.loads(value) + # convert values that are dicts to Task instances return { - k: v for k, v in super().dict(*args, **kwargs).items() if k not in ['graph'] + key: ( + Task(name=key, **{k: v for k, v in val.items() if k != 'name'}) + if isinstance(val, dict) + else val + ) + for key, val in value.items() } + @validator('data', pre=True) + def convert_job_data(cls, value, values, **kwargs): + # convert a string to a dict + if isinstance(value, str): + value = json.loads(value) + return value + @validator('tasks') def validate_tasks(cls, value, values, **kwargs): """ Ensure that all Task.depends are defined as Tasks. """ errors = [] - task_names = [task.name for task in value] - for task in value: + task_names = list(value.keys()) + for task_name, task in value.items(): for depend_name in task.depends: if depend_name not in task_names: errors.append( - f"Task '{task.name}' depends on undefined Task '{depend_name}'." + f"Task '{task_name}' depends on undefined Task '{depend_name}'." ) if errors: raise ValueError(' '.join(errors)) @@ -74,14 +135,14 @@ def validate_tasks(cls, value, values, **kwargs): @validator('graph', always=True) def validate_graph(cls, value, values, **kwargs): """ - Build the Workflow.graph from the Workflow.tasks, and ensure that the graph is + Build the job.graph from the job.tasks, and ensure that the graph is acyclic (a directed acyclic graph). """ graph = nx.DiGraph() - for task in values.get('tasks') or []: - graph.add_node(task.name) # make sure every task is added + for task_name, task in (values.get('tasks') or {}).items(): + graph.add_node(task_name) # make sure every task is added for depend_name in task.depends: - graph.add_edge(depend_name, task.name) + graph.add_edge(depend_name, task_name) if not nx.is_directed_acyclic_graph(graph): raise ValueError( 'The tasks graph must be acyclic, but it currently includes cycles.' @@ -89,94 +150,85 @@ def validate_graph(cls, value, values, **kwargs): # the transitive reduction ensures the shortest version of the workflow. return nx.transitive_reduction(graph) - @property - def tasks_dict(self): - return {task.name: task for task in self.tasks} + # ancestors and descendants of a given task @property - def ancestors(self): + def task_ancestors(self): """ graph ancestors, with the keys in lexicographical topological sort order. """ return { - name: list(nx.ancestors(self.graph, name)) - for name in nx.lexicographical_topological_sort(self.graph) - } - - @property - def tasks_ancestors(self): - return { - task.name: [self.tasks_dict[name] for name in self.ancestors[task.name]] - for task in self.tasks + task_name: [ + self.tasks[name] for name in nx.ancestors(self.graph, task_name) + ] + for task_name in nx.lexicographical_topological_sort(self.graph) } @property - def tasks_descendants(self): + def task_descendants(self): return { - task.name: [ - self.tasks_dict[name] for name in nx.descendants(self.graph, task.name) + task_name: [ + self.tasks[name] for name in nx.descendants(self.graph, task_name) ] - for task in self.tasks + for task_name in self.tasks } + # lists of tasks with various statuses + @property def started_tasks(self): """ - Started tasks are those with a Status value greater than or equal to - Status.processing + Tasks with a Status value greater than or equal to Status.processing """ return list( filter( - lambda task: (enums.Status[task.status] >= enums.Status.processing), - self.tasks, + lambda task: (Status[task.status] >= Status.processing), + self.tasks.values(), ) ) @property def completed_tasks(self): """ - Completed tasks are those with a Status value greater than or equal to - Status.completed + Tasks with a Status value greater than or equal to Status.completed """ return list( filter( - lambda task: (enums.Status[task.status] >= enums.Status.completed), - self.tasks, + lambda task: (Status[task.status] >= Status.completed), + self.tasks.values(), ) ) @property def failed_tasks(self): """ - Failed tasks are those with a Status value greater than or equal to - Status.cancelled + Tasks with a Status value greater than or equal to Status.cancelled """ return list( filter( - lambda task: (enums.Status[task.status] >= enums.Status.cancelled), - self.tasks, + lambda task: (Status[task.status] >= Status.cancelled), + self.tasks.values(), ) ) @property def successful_tasks(self): """ - Successful tasks are those that have completed and not failed. + Tasks that have completed and not failed. """ completed = self.completed_tasks failed = self.failed_tasks return list( filter( lambda task: (task in completed and task not in failed), - self.tasks, + self.tasks.values(), ) ) @property def ready_tasks(self): """ - Ready tasks are those that are not started, and for which all ancestors are - successful. + Tasks that are not started, and for which all ancestors are successful. """ started = self.started_tasks successful = self.successful_tasks @@ -187,82 +239,10 @@ def ready_tasks(self): and all( map( lambda task: task in successful, - self.tasks_ancestors[task.name], + self.task_ancestors[task.name], ) ) ), - self.tasks, + self.tasks.values(), ) ) - - -class Job(Model): - """ - A single job in the Job queue. - """ - - id: UUID = Field(default=None) - qname: str = Field(default='') - retries: int = Field(default=1) - status: str = Field(default=enums.Status.queued.name) - queued: datetime = Field(default=None) - scheduled: datetime = Field(default=None) - workflow: Workflow = Field(default_factory=dict) - data: dict = Field(default_factory=dict) - - @validator('status') - def validate_job_status(cls, val, values, **kwargs): - if val not in enums.Status.__members__.keys(): - raise ValueError( - f'value must be one of {list(enums.Status.__members__.keys())}' - ) - return val - - @validator('workflow', pre=True) - def convert_job_workflow(cls, val, values, **kwargs): - if isinstance(val, str): - val = json.loads(val) - return val - - @validator('data', pre=True) - def convert_job_data(cls, val, values, **kwargs): - if isinstance(val, str): - val = json.loads(val) - return val - - -class JobLog(Model): - """ - A logged Job to maintain a historical record of Jobs that have been completed. - """ - - id: UUID - qname: str - retries: int - status: str - queued: datetime = Field(default=None) - scheduled: datetime = Field(default=None) - initialized: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc)) - logged: datetime = Field(default=None) - workflow: Workflow - data: dict - - @validator('status') - def validate_joblog_status(cls, val, values, **kwargs): - if val not in enums.Status.__members__.keys(): - raise ValueError( - f'value must be one of {list(enums.Status.__members__.keys())}' - ) - return val - - @validator('workflow', pre=True) - def convert_joblog_workflow(cls, val, values, **kwargs): - if isinstance(val, str): - val = json.loads(val) - return val - - @validator('data', pre=True) - def convert_joblog_data(cls, val, values, **kwargs): - if isinstance(val, str): - val = json.loads(val) - return val diff --git a/postq/q.py b/postq/q.py index 41c65bf..639dbb1 100644 --- a/postq/q.py +++ b/postq/q.py @@ -3,6 +3,7 @@ import logging import os import tempfile +from copy import deepcopy from pathlib import Path from threading import Thread from typing import Callable @@ -13,7 +14,7 @@ from postq import tables from postq.enums import Status -from postq.models import Job, JobLog, Task +from postq.models import Job, Task log = logging.getLogger(__name__) @@ -90,7 +91,7 @@ async def process_job( number: int, job: Job, executor: Callable[[str, dict, str], None], -) -> JobLog: +) -> Job: """ Process the given Job and return a JobLog with the results. @@ -116,14 +117,13 @@ async def process_job( log.info( "[%s %02d] processing Job: %s [queued=%s]", qname, number, job.id, job.queued ) + job = deepcopy(job) # bind PULL socket (task sink) socket_file = Path(os.getenv('TMPDIR', '')) / f'.postq-{qname}-{number:02d}.ipc' address = f"ipc://{socket_file}" task_sink = bind_pull_socket(address) - joblog = JobLog(**job.dict()) - with tempfile.TemporaryDirectory() as jobdir: # The jobdir directory will be available to all tasks in the job workflow, to # use for temporary files in the processing of tasks. @@ -134,10 +134,10 @@ async def process_job( # loop until all the tasks are finished (either completed or failed) while ( - min([Status[task.status] for task in job.workflow.tasks]) < Status.completed + min([Status[task.status] for task in job.tasks.values()]) < Status.completed ): # do all the ready tasks (ancestors are completed and not failed) - for task in job.workflow.ready_tasks: + for task in job.ready_tasks: log.debug('[%s] %s ready = %r', address, task.name, task) # start an executor thread for each task. give it the address to send a @@ -145,7 +145,7 @@ async def process_job( # the task definition as a copy via `.dict()`) thread = Thread(target=executor, args=(address, jobdir, task.dict())) thread.start() - task.status = Status.processing.name + task.status = str(Status.processing) # wait for any task to complete. (all tasks send a message to the task_sink. # the task_result is the task definition itself as a `.dict()`). @@ -154,27 +154,25 @@ async def process_job( log.debug("[%s] %s result = %r", address, task.name, result_task) # when a task completes, update the task definition with its status and errors. - task = job.workflow.tasks_dict[result_task.name] + task = job.tasks[result_task.name] task.update(**result_task.dict()) # if it failed, mark all descendants as cancelled if Status[task.status] >= Status.cancelled: - for descendant_task in job.workflow.tasks_descendants[task.name]: + for descendant_task in job.task_descendants[task.name]: log.debug( "[%s] %s cancel = %r", address, task.name, descendant_task ) - descendant_task.status = Status.cancelled.name + descendant_task.status = str(Status.cancelled) # all the tasks have now either succeeded, failed, or been cancelled. The Job status # is the maximum (worst) status of any task. - job.status = max([Status[task.status] for task in job.workflow.tasks]).name - joblog.update(**job.dict()) + job.status = str(max([Status[task.status] for task in job.tasks.values()])) log.info( "[%s %02d] completed Job: %s [status=%s]", qname, number, job.id, job.status ) - - return joblog + return job def bind_pull_socket(address: str) -> zmq.Socket: diff --git a/postq/tables.py b/postq/tables.py index f02fe16..ee84fd9 100644 --- a/postq/tables.py +++ b/postq/tables.py @@ -1,8 +1,6 @@ -from sqlalchemy import Column, DateTime, MetaData, SmallInteger, String, Table, text +from sqlalchemy import Column, DateTime, MetaData, String, Table, text from sqlalchemy.dialects.postgresql import JSONB, UUID -from . import enums - metadata = MetaData(schema='postq') Job = Table( @@ -10,7 +8,7 @@ metadata, Column('id', UUID, primary_key=True, server_default=text("gen_random_uuid()")), Column('qname', String, nullable=False, index=True), - Column('retries', SmallInteger, server_default=text("1"), index=True), + Column('status', String, nullable=False), Column( 'queued', DateTime(timezone=True), @@ -24,18 +22,23 @@ nullable=False, server_default=text('current_timestamp'), ), - Column('status', String, nullable=False), - Column('workflow', JSONB, server_default=text("'{}'::jsonb")), + Column('initialized', DateTime(timezone=True), nullable=True), + Column( + 'logged', + DateTime(timezone=True), + nullable=True, + ), + Column('tasks', JSONB, server_default=text("'{}'::jsonb")), Column('data', JSONB, server_default=text("'{}'::jsonb")), ) Job.get = ( lambda: """ - UPDATE postq.job job1 SET retries = retries - 1 + UPDATE postq.job job1 SET status = 'processing' WHERE job1.id = ( SELECT job2.id FROM postq.job job2 WHERE job2.qname = :qname - AND job2.retries > 0 + AND job2.status = 'queued' AND job2.scheduled <= now() ORDER BY job2.queued FOR UPDATE SKIP LOCKED LIMIT 1 @@ -50,7 +53,6 @@ metadata, Column('id', UUID, nullable=False), Column('qname', String, nullable=False), - Column('retries', SmallInteger), Column('queued', DateTime(timezone=True), nullable=False), Column('scheduled', DateTime(timezone=True), nullable=False), Column('initialized', DateTime(timezone=True), nullable=True), @@ -60,7 +62,7 @@ nullable=False, server_default=text('current_timestamp'), ), - Column('status', String, nullable=False, default=enums.Status.queued.name), - Column('workflow', JSONB, server_default=text("'{}'::jsonb")), + Column('status', String, nullable=False), + Column('tasks', JSONB, server_default=text("'{}'::jsonb")), Column('data', JSONB, server_default=text("'{}'::jsonb")), ) diff --git a/postq/tests/mocks.py b/postq/tests/mocks.py index 02086d1..c13758e 100644 --- a/postq/tests/mocks.py +++ b/postq/tests/mocks.py @@ -9,5 +9,5 @@ def mock_executor(address, jobdir, task_def): message back to address. """ task = models.Task(**task_def) - task.status = task.params.get('status') or enums.Status.completed.name + task.status = task.params.get('status') or str(enums.Status.completed) executors.send_data(address, task.dict()) diff --git a/postq/tests/test_executors.py b/postq/tests/test_executors.py index 24517ce..fa39c64 100644 --- a/postq/tests/test_executors.py +++ b/postq/tests/test_executors.py @@ -51,7 +51,7 @@ async def test_shell_executor(): @pytest.mark.asyncio async def test_docker_executor(): """ - Live-test the docker_executor. + Live-test the docker_executor. (NOTE: Running commands in a docker container takes time as compared with the shell. So the number of docker commands we test is limited.) diff --git a/postq/tests/test_models.py b/postq/tests/test_models.py index b0f510d..5924634 100644 --- a/postq/tests/test_models.py +++ b/postq/tests/test_models.py @@ -5,73 +5,54 @@ import pytest from pydantic import ValidationError -from postq.models import Job, JobLog +from postq.models import Job -def test_job_joblog_invalid(): - """ - Invalid Job/Log definitions raise ValidationErrors - """ - for tasks in [ +@pytest.mark.parametrize( + 'tasks', + [ # 'b' depends on 'c', which doesn't exist - [{'name': 'a'}, {'name': 'b', 'depends': ['c']}], + {'a': {}, 'b': {'depends': ['c']}}, # 'a' and 'b' depend each other, so the graph is not acyclic, as required - [{'name': 'a', 'depends': ['b']}, {'name': 'b', 'depends': ['a']}], - ]: - with pytest.raises(ValidationError): - Job(qname='test', workflow={'tasks': tasks}) - with pytest.raises(ValidationError): - JobLog( - id=uuid4(), - qname='test', - status='queued', - retries=0, - workflow={'tasks': tasks}, - data={}, - ) + {'a': {'depends': ['b']}, 'b': {'depends': ['a']}}, + ], +) +def test_job_joblog_invalid(tasks): + """ + Invalid Job definitions raise ValidationErrors + """ + with pytest.raises(ValidationError): + Job(qname='test', tasks=tasks) + +def test_job_log_invalid_status(): # Job/Log with invalid status with pytest.raises(ValidationError): Job(qname='test', status='bad') - with pytest.raises(ValidationError): - JobLog( - id=uuid4(), - qname='test', - status='bad', - retries=0, - workflow={'tasks': tasks}, - data={}, - ) def test_job_log_workflow_data_str(): """ Job/Log.workflow and Job/Log.data can be converted from a str """ - tasks = [{'name': 'a'}, {'name': 'b', 'depends': ['a']}] - workflow = {'tasks': tasks} + tasks = {'a': {}, 'b': {'depends': ['a']}} data = {'some': 'info', 'that': 'this', 'job': 'uses'} - assert Job(qname='test', workflow=workflow, data=data) == Job( - qname='test', workflow=json.dumps(workflow), data=json.dumps(data) - ) - joblog_id = uuid4() + job_id = uuid4() timestamp = datetime.now(tz=timezone.utc) - jl1 = JobLog( - id=joblog_id, + jl1 = Job( + id=job_id, qname='test', initialized=timestamp, status='queued', - retries=0, - workflow=workflow, + tasks=tasks, data=data, ) - jl2 = JobLog( - id=joblog_id, + jl2 = Job( + id=job_id, qname='test', initialized=timestamp, status='queued', - retries=0, - workflow=json.dumps(workflow), + tasks=json.dumps(tasks), data=json.dumps(data), ) assert jl1.dict() == jl2.dict() diff --git a/postq/tests/test_q.py b/postq/tests/test_q.py index 04eb6c8..e468f2d 100644 --- a/postq/tests/test_q.py +++ b/postq/tests/test_q.py @@ -5,7 +5,7 @@ from postq import q, tables from postq.enums import Status from postq.executors import shell_executor -from postq.models import Job, Workflow +from postq.models import Job from postq.tests.mocks import mock_executor @@ -15,34 +15,34 @@ [ # the simplest workflow: one task, no dependencies. { - 'tasks': [{'name': 'a', 'depends': [], 'params': {'status': 'success'}}], - 'status': {'a': 'success'}, + 'tasks': {'a': {'depends': [], 'params': {'status': 'success'}}}, + 'results': {'a': {'status': 'success'}}, }, # if the first task fails, the dependent one is cancelled. { - 'tasks': [ - {'name': 'a', 'depends': [], 'params': {'status': 'failure'}}, - {'name': 'b', 'depends': ['a'], 'params': {'status': 'success'}}, - ], - 'status': {'a': 'failure', 'b': 'cancelled'}, + 'tasks': { + 'a': {'depends': [], 'params': {'status': 'failure'}}, + 'b': {'depends': ['a'], 'params': {'status': 'success'}}, + }, + 'results': {'a': {'status': 'failure'}, 'b': {'status': 'cancelled'}}, }, # if a task fails, all dependent tasks are cancelled, but others run { - 'tasks': [ - {'name': 'a', 'depends': [], 'params': {'status': 'success'}}, - {'name': 'b', 'depends': ['a'], 'params': {'status': 'failure'}}, - {'name': 'c', 'depends': ['a'], 'params': {'status': 'success'}}, - {'name': 'd', 'depends': ['b'], 'params': {'status': 'success'}}, - {'name': 'e', 'depends': ['c'], 'params': {'status': 'success'}}, - {'name': 'f', 'depends': ['d', 'e'], 'params': {'status': 'success'}}, - ], - 'status': { - 'a': 'success', - 'b': 'failure', - 'c': 'success', - 'd': 'cancelled', - 'e': 'success', - 'f': 'cancelled', + 'tasks': { + 'a': {'depends': [], 'params': {'status': 'success'}}, + 'b': {'depends': ['a'], 'params': {'status': 'failure'}}, + 'c': {'depends': ['a'], 'params': {'status': 'success'}}, + 'd': {'depends': ['b'], 'params': {'status': 'success'}}, + 'e': {'depends': ['c'], 'params': {'status': 'success'}}, + 'f': {'depends': ['d', 'e'], 'params': {'status': 'success'}}, + }, + 'results': { + 'a': {'status': 'success'}, + 'b': {'status': 'failure'}, + 'c': {'status': 'success'}, + 'd': {'status': 'cancelled'}, # depends on 'b' + 'e': {'status': 'success'}, + 'f': {'status': 'cancelled'}, # depends on 'd' -> 'b' }, }, ], @@ -53,14 +53,13 @@ async def test_process_job_task_result_status(item): the job as a whole is as expected. """ qname, number = 'test', 1 - job = Job( - id=uuid4(), status='queued', qname=qname, workflow={'tasks': item['tasks']} - ) + job = Job(id=uuid4(), status='queued', qname=qname, tasks=item['tasks']) joblog = await q.process_job(qname, number, job, mock_executor) - joblog.workflow = Workflow(**joblog.workflow) - for task in joblog.workflow.tasks: - assert task.status == item['status'][task.name] - assert joblog.status == str(max(Status[task.status] for task in job.workflow.tasks)) + for task_name, task in joblog.tasks.items(): + assert task.status == item['results'][task_name]['status'] + assert joblog.status == str( + max(Status[task.status] for task in joblog.tasks.values()) + ) @pytest.mark.asyncio @@ -71,21 +70,19 @@ async def test_transact_job(database): """ # queue the job qname = 'test' - job = Job( - qname=qname, workflow={'tasks': [{'name': 'a', 'params': {'command': "ls"}}]} - ) + job = Job(qname=qname, tasks={'a': {'command': 'ls'}}) record = await database.fetch_one( - tables.Job.insert().returning(*tables.Job.c), values=job.dict() + tables.Job.insert().returning(*tables.Job.columns), values=job.dict() ) job.update(**record) # process one job from the queue result = await q.transact_one_job(database, qname, 1, shell_executor) job_record = await database.fetch_one( - tables.Job.select().where(tables.Job.c.id == job.id).limit(1) + tables.Job.select().where(tables.Job.columns.id == job.id).limit(1) ) joblog_record = await database.fetch_one( - tables.JobLog.select().where(tables.JobLog.c.id == job.id).limit(1) + tables.JobLog.select().where(tables.JobLog.columns.id == job.id).limit(1) ) assert result is True assert job_record is None # already deleted