Skip to content

Commit

Permalink
Updated README. Removed psql example, since currrently the postq proc…
Browse files Browse the repository at this point in the history
…ess will crash if there is bad data in the postq.job table -- data validation during insert is an important part of the process until we have error trapping on select.
  • Loading branch information
seanharrison committed Sep 12, 2020
1 parent 6b15469 commit 755c9b2
Showing 1 changed file with 38 additions and 61 deletions.
99 changes: 38 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,70 +48,47 @@ PostQ is a job queue system with
docker-compose up
```
The default docker-compose.yml cluster definition uses the docker executor (so tasks must define an image) with a maximum queue sleep time of 5 seconds and the default qname=''. Note that the default cluster doesn't expose any ports to the outside world, but you can for example shell into the running cluster (using a second terminal) and start pushing tasks into the queue. Or, the more common case is that your PostgreSQL instance is available inside your application cluster, so you can push jobs into postq directly from your application.
Here is an example in Python using [Databases](https://encode.io/databases), [SQL Alchemy Core](https://docs.sqlalchemy.org/en/13/core/), and data models written in [Pydantic](https://pydantic-docs.helpmanual.io/):
```bash
$ docker-compose exec postq ipython
```
```python
# (Using the ipython shell, which allows async/await without an explicit event loop.)
import os
from databases import Database
from postq import models, tables
database = Database(os.getenv('DATABASE_URL'))
await database.connect()
job = models.Job(
tasks= {'a': {'image': 'debian:buster-slim', 'command': 'ls -laFh'}}
)
record = await database.fetch_one(
tables.Job.insert().returning(*tables.Job.columns), values=job.dict()
)
# Then, after a few seconds...
joblog = models.Job(
**await database.fetch_one(
tables.JobLog.select().where(
tables.JobLog.columns.id==record['id']
).limit(1)
)
)
print(joblog.tasks[0].results)
<!-- * [TODO] **Can use a message broker as the Job Queue.** Applications that need higher performance and throughput than PostgreSQL can provide must be able to shift up to something more performant. For example, RabbitMQ is a very high-performance message broker written in Erlang.
# total 4.0K
# drwxr-xr-x 2 root root 64 Sep 11 04:11 ./
# drwxr-xr-x 1 root root 4.0K Sep 11 04:11 ../
```
Now you have a job log entry with the output of your command in the task results. :tada:
* [TODO] **Can run (persistent) Task workers.** Some Tasks or Task environments (images) are anticipated as being needed continually. In such job environments, the Task workers can be made persistent services that listen to the Job queue for their own Jobs. (In essence, this allows a Task to be a complete sub-workflow being handled by its own Workflow Job queue workers, in which the Tasks are enabled to run inside the Job worker container as subprocesses.) -->
Similar results can be achieved with SQL directly, or with any other interface. Here's the same example run in the `psql` terminal inside the running cluster:
```bash
$ docker-compose exec postq bash
$ psql $DATABASE_URL
```
```sql
postq=# insert into postq.job (qname, status, workflow) values ('', 'queued', '{"tasks": [{"name": "a", "params": {"image": "debian:buster-slim", "command": "ls -laFh"}}]}') returning id;
-[ RECORD 1 ]----------------------------
id | 17d0a67c-98fb-4f84-913e-2f0532bc069f
INSERT 0 1
postq=# select * from postq.job_log where id = '17d0a67c-98fb-4f84-913e-2f0532bc069f';
-[ RECORD 1 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
id | 17d0a67c-98fb-4f84-913e-2f0532bc069f
qname |
retries | 0
queued | 2020-09-11 04:48:53.897556+00
scheduled | 2020-09-11 04:48:53.897556+00
initialized | 2020-09-11 04:48:54.40734+00
logged | 2020-09-11 04:48:54.400779+00
status | success
workflow | {"tasks": [{"name": "a", "errors": "", "params": {"image": "debian:buster-slim", "command": "ls -laFh"}, "status": "success", "depends": [], "results": "total 4.0K\r\ndrwxr-xr-x 2 root root 64 Sep 11 04:48 ./\r\ndrwxr-xr-x 1 root root 4.0K Sep 11 04:48 ../\r\n"}]}
data | {}
```
## Usage Examples
Here is an example in Python using the running postq container itself. The Python stack is [Databases](https://encode.io/databases), [SQL Alchemy Core](https://docs.sqlalchemy.org/en/13/core/), and data models written in [Pydantic](https://pydantic-docs.helpmanual.io/):
```bash
$ docker-compose exec postq ipython
```
```python
# (Using the ipython shell, which allows async/await without an explicit event loop.)
import os
from databases import Database
from postq import models, tables
database = Database(os.getenv('DATABASE_URL'))
await database.connect()
job = models.Job(
tasks= {'a': {'params': {'image': 'debian:buster-slim', 'command': 'echo Hey!'}}}
)
record = await database.fetch_one(
tables.Job.insert().returning(*tables.Job.columns), values=job.dict()
)
# Then, after a few seconds...
joblog = models.Job(
**await database.fetch_one(
tables.JobLog.select().where(
tables.JobLog.columns.id==record['id']
).limit(1)
)
)
<!-- * [TODO] **Can use a message broker as the Job Queue.** Applications that need higher performance and throughput than PostgreSQL can provide must be able to shift up to something more performant. For example, RabbitMQ is a very high-performance message broker written in Erlang.
print(joblog.tasks['a'].results)
* [TODO] **Can run (persistent) Task workers.** Some Tasks or Task environments (images) are anticipated as being needed continually. In such job environments, the Task workers can be made persistent services that listen to the Job queue for their own Jobs. (In essence, this allows a Task to be a complete sub-workflow being handled by its own Workflow Job queue workers, in which the Tasks are enabled to run inside the Job worker container as subprocesses.) -->
# Hey!
```
Now you have a job log entry with the output of your command in the task results. :tada:

0 comments on commit 755c9b2

Please sign in to comment.