Skip to content

Commit

Permalink
Job.tasks are now a dict of (name, Task), not a list. This is simpler…
Browse files Browse the repository at this point in the history
… to write, and a better representation of the data. (However, it did require a rather deep revision of everything.)
  • Loading branch information
seanharrison committed Sep 12, 2020
1 parent 424da34 commit 6b15469
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 261 deletions.
13 changes: 3 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,23 @@ PostQ is a job queue system with
database = Database(os.getenv('DATABASE_URL'))
await database.connect()
job = models.Job(
workflow={
'tasks': [
{
'name': 'a',
'params': {'image': 'debian:buster-slim', 'command': 'ls -laFh'}
}
]
}
tasks= {'a': {'image': 'debian:buster-slim', 'command': 'ls -laFh'}}
)
record = await database.fetch_one(
tables.Job.insert().returning(*tables.Job.columns), values=job.dict()
)
# Then, after a few seconds...
joblog = models.JobLog(
joblog = models.Job(
**await database.fetch_one(
tables.JobLog.select().where(
tables.JobLog.columns.id==record['id']
).limit(1)
)
)
print(joblog.workflow.tasks[0].results)
print(joblog.tasks[0].results)
# total 4.0K
# drwxr-xr-x 2 root root 64 Sep 11 04:11 ./
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""job_log
"""job log
Revision ID: b76f56db845a
Revision ID: 8c4306d66631
Revises: 7e8956a72cf2
Create Date: 2020-09-05 19:48:20.702598+00:00
Create Date: 2020-09-12 04:55:39.362899+00:00
"""
import sqlalchemy as sa
Expand All @@ -11,7 +11,7 @@
from alembic import op

# revision identifiers, used by Alembic.
revision = 'b76f56db845a'
revision = '8c4306d66631'
down_revision = '7e8956a72cf2'
branch_labels = None
depends_on = None
Expand All @@ -28,9 +28,7 @@ def upgrade():
nullable=False,
),
sa.Column('qname', sa.String(), nullable=False),
sa.Column(
'retries', sa.SmallInteger(), server_default=sa.text('1'), nullable=True
),
sa.Column('status', sa.String(), nullable=False),
sa.Column(
'queued',
sa.DateTime(timezone=True),
Expand All @@ -43,9 +41,10 @@ def upgrade():
server_default=sa.text('current_timestamp'),
nullable=False,
),
sa.Column('status', sa.String(), nullable=False),
sa.Column('initialized', sa.DateTime(timezone=True), nullable=True),
sa.Column('logged', sa.DateTime(timezone=True), nullable=True),
sa.Column(
'workflow',
'tasks',
postgresql.JSONB(astext_type=sa.Text()),
server_default=sa.text("'{}'::jsonb"),
nullable=True,
Expand All @@ -65,14 +64,10 @@ def upgrade():
op.create_index(
op.f('ix_postq_job_queued'), 'job', ['queued'], unique=False, schema='postq'
)
op.create_index(
op.f('ix_postq_job_retries'), 'job', ['retries'], unique=False, schema='postq'
)
op.create_table(
'job_log',
sa.Column('id', postgresql.UUID(), nullable=False),
sa.Column('qname', sa.String(), nullable=False),
sa.Column('retries', sa.SmallInteger(), nullable=True),
sa.Column('queued', sa.DateTime(timezone=True), nullable=False),
sa.Column('scheduled', sa.DateTime(timezone=True), nullable=False),
sa.Column('initialized', sa.DateTime(timezone=True), nullable=True),
Expand All @@ -84,7 +79,7 @@ def upgrade():
),
sa.Column('status', sa.String(), nullable=False),
sa.Column(
'workflow',
'tasks',
postgresql.JSONB(astext_type=sa.Text()),
server_default=sa.text("'{}'::jsonb"),
nullable=True,
Expand All @@ -103,7 +98,6 @@ def upgrade():
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('job_log', schema='postq')
op.drop_index(op.f('ix_postq_job_retries'), table_name='job', schema='postq')
op.drop_index(op.f('ix_postq_job_queued'), table_name='job', schema='postq')
op.drop_index(op.f('ix_postq_job_qname'), table_name='job', schema='postq')
op.drop_table('job', schema='postq')
Expand Down
6 changes: 3 additions & 3 deletions postq/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def executor(
task.results = process.stdout.decode() # TODO: I'm not happy with forcing UTF-8
task.errors = process.stderr.decode()
if process.returncode > 0:
task.status = Status.error.name
task.status = str(Status.error)
elif task.errors:
task.status = Status.warning.name
task.status = str(Status.warning)
else:
task.status = Status.success.name
task.status = str(Status.success)

send_data(address, task.dict())

Expand Down
Loading

0 comments on commit 6b15469

Please sign in to comment.