Skip to content

Commit 04694a9

Browse files
committed
chore: separate feathers-tasks examples and refactor with k8s api and dockerode
1 parent d2ff7ee commit 04694a9

23 files changed

Lines changed: 2072 additions & 332 deletions

File tree

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
FROM node:20-alpine
2+
3+
WORKDIR /app
4+
5+
RUN corepack enable && corepack prepare pnpm@10.1.0 --activate
6+
7+
COPY pnpm-workspace.yaml ./
8+
COPY pnpm-lock.yaml ./
9+
COPY package.json ./
10+
11+
COPY packages/feathers-tasks/package.json ./packages/feathers-tasks/
12+
COPY packages/feathers-tasks/src ./packages/feathers-tasks/src/
13+
14+
COPY examples/feathers-bpmn-orchestration/package.json ./examples/feathers-bpmn-orchestration/
15+
16+
RUN pnpm install --filter feathers-bpmn-orchestration-example --prod --ignore-scripts
17+
18+
COPY examples/feathers-bpmn-orchestration/worker ./examples/feathers-bpmn-orchestration/worker
19+
20+
WORKDIR /app/examples/feathers-bpmn-orchestration
21+
22+
ENV NODE_ENV=production
23+
24+
CMD ["node", "worker/run-job.js"]
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
# feathers-bpmn-orchestration
2+
3+
BPMN-driven orchestration on top of [`@kalisio/feathers-tasks`](../../packages/feathers-tasks). Each BPMN `serviceTask` is routed — via the `meta:jobType` extension — to either a **Docker container** (created via [`dockerode`](https://github.com/apocas/dockerode)) or a **Kubernetes pod** (created via [`@kubernetes/client-node`](https://github.com/kubernetes-client/javascript)). One BPMN service task = one BullMQ job = one ephemeral container/pod.
4+
5+
For a simpler version without BPMN, see [`examples/feathers-tasks-orchestration`](../feathers-tasks-orchestration/).
6+
7+
## Architecture
8+
9+
```
10+
┌──────────────────────────────────────────────────────────────────┐
11+
│ Feathers server (this process) │
12+
│ │
13+
│ POST /workflows │
14+
│ │ │
15+
│ ▼ │
16+
│ WorkflowsService ──► WorkflowEngine (bpmn-moddle) │
17+
│ │ parses BPMN, walks elements │
18+
│ ▼ │
19+
│ TaskService ──► BullMQ queue (Redis) │
20+
│ ▲ │
21+
│ QueueEvents │
22+
│ completed ─► WorkflowEngine._advance() │
23+
│ waiting ─► Dispatcher router │
24+
│ │ │
25+
│ ▼ │
26+
│ DockerDispatcher | KubernetesDispatcher │
27+
│ │ creates 1 runner per job │
28+
└─────────────────────────────────────┼────────────────────────────┘
29+
30+
┌─────────────┴─────────────┐
31+
▼ ▼
32+
Docker container Kubernetes pod
33+
(worker/run-job.js) (worker/run-job.js)
34+
— picks one job, exits — — picks one job, exits —
35+
```
36+
37+
## BPMN workflow
38+
39+
The example [workflows/example.bpmn](workflows/example.bpmn) models a 4-step pipeline with a parallel branch:
40+
41+
```
42+
Start ──► Ingest (docker-job, 3 steps)
43+
44+
45+
[parallel split]
46+
/ \
47+
Process A Process B
48+
(k8s-job, 4 steps) (k8s-job, 2 steps)
49+
\ /
50+
[parallel join] ← waits for both branches
51+
52+
53+
Export (docker-job, 2 steps)
54+
55+
56+
End
57+
```
58+
59+
Each `serviceTask` carries two custom extension attributes:
60+
61+
| Attribute | Description |
62+
|-----------|-------------|
63+
| `meta:jobType` | Routes the job to the matching dispatcher (`docker-job` or `k8s-job`) |
64+
| `meta:steps` | Number of simulated work steps (each step ≈ 500 ms) |
65+
66+
## Key files
67+
68+
| File | Role |
69+
|------|------|
70+
| [server/index.js](server/index.js) | Feathers app, queue wiring, dispatcher router, BPMN hooks |
71+
| [server/workflow-engine.js](server/workflow-engine.js) | BPMN parser + state machine (`WorkflowEngine`, `WorkflowInstance`) |
72+
| [server/workflows.service.js](server/workflows.service.js) | Feathers `workflows` service |
73+
| [server/runners.service.js](server/runners.service.js) | Feathers `runners` service — tracks each container/pod |
74+
| [server/dispatchers/docker.js](server/dispatchers/docker.js) | Docker container dispatcher (dockerode) |
75+
| [server/dispatchers/kubernetes.js](server/dispatchers/kubernetes.js) | Kubernetes Job dispatcher |
76+
| [worker/run-job.js](worker/run-job.js) | Ephemeral BullMQ worker — picks one job then exits |
77+
| [workflows/example.bpmn](workflows/example.bpmn) | BPMN 2.0 process definition |
78+
| [Dockerfile](Dockerfile) | Builds the worker image used by both dispatchers |
79+
80+
## Prerequisites
81+
82+
- **Node 20+**, **pnpm 10+**
83+
- **Redis** on `localhost:6379`
84+
- **Docker** with the daemon socket accessible (`/var/run/docker.sock`)
85+
- **A local Kubernetes cluster** (`kind`, `k3d`, `minikube`, Docker Desktop, …) with a working `kubectl` context
86+
- A common container image visible from both Docker and the K8s cluster
87+
88+
> The K8s dispatcher uses `imagePullPolicy: Never` by default. For clusters like `kind` you must `kind load docker-image feathers-tasks-worker:latest`. For `minikube`, build inside the VM (`eval $(minikube docker-env)`).
89+
90+
## Getting started
91+
92+
```bash
93+
# 1 — Install dependencies (from the workspace root)
94+
pnpm install
95+
96+
# 2 — Start Redis
97+
redis-server
98+
99+
# 3 — Build the worker image (from the example folder)
100+
cd examples/feathers-bpmn-orchestration
101+
pnpm build:image
102+
103+
# 4 — (kind only) Load the image into the cluster
104+
kind load docker-image feathers-tasks-worker:latest
105+
106+
# 5 — Start the server
107+
pnpm dev:server
108+
# or auto-launch the example workflow on boot:
109+
AUTORUN=1 pnpm dev:server
110+
```
111+
112+
## Launching workflows
113+
114+
```bash
115+
curl -X POST http://localhost:3030/workflows \
116+
-H 'Content-Type: application/json' \
117+
-d '{"name":"My workflow","bpmnFile":"./workflows/example.bpmn"}'
118+
```
119+
120+
You can also POST the XML inline:
121+
122+
```bash
123+
BPMN=$(cat workflows/example.bpmn | jq -Rs .)
124+
curl -X POST http://localhost:3030/workflows \
125+
-H 'Content-Type: application/json' \
126+
-d "{\"name\":\"inline\",\"bpmnXml\":$BPMN}"
127+
```
128+
129+
### Submit a standalone task (no BPMN)
130+
131+
```bash
132+
curl -X POST http://localhost:3030/tasks \
133+
-H 'Content-Type: application/json' \
134+
-d '{"type":"docker-job","payload":{"label":"manual","steps":2}}'
135+
```
136+
137+
## Observing state
138+
139+
| Endpoint | What you see |
140+
|----------|--------------|
141+
| `GET /workflows` | Workflow definitions + instance status (`running` / `completed` / `failed`) |
142+
| `GET /tasks` | One record per BullMQ job |
143+
| `GET /runners` | One record per container/pod, with orchestrator, id/name, status, timestamps |
144+
| `http://localhost:3030/admin/tasks` | Bull Board UI |
145+
146+
Live container/pod visibility:
147+
148+
```bash
149+
docker ps --filter label=feathers-tasks.job-id
150+
kubectl get jobs,pods -l app=feathers-tasks-worker
151+
```
152+
153+
## Configuration
154+
155+
Identical to the simpler example — see [`feathers-tasks-orchestration/README.md`](../feathers-tasks-orchestration/README.md#configuration-environment-variables). Additionally:
156+
157+
| Variable | Default | Description |
158+
|----------|---------|-------------|
159+
| `AUTORUN` | _(unset)_ | Set to any value to auto-launch `workflows/example.bpmn` on startup |
160+
161+
## How the BPMN engine works
162+
163+
### Parsing
164+
165+
[`bpmn-moddle`](https://github.com/bpmn-io/bpmn-moddle) converts the BPMN XML into an object graph. `WorkflowEngine.launch()` extracts the `bpmn:Process` root and builds a flat `elementMap` keyed by element id.
166+
167+
### State machine
168+
169+
`WorkflowInstance` walks the graph element by element:
170+
171+
1. **StartEvent** — follows the first outgoing `SequenceFlow`.
172+
2. **ServiceTask** — reads `meta:jobType` / `meta:steps` from `extensionElements` and enqueues a BullMQ job. The BPMN-task → BullMQ-job mapping is stored in `jobToElement`.
173+
3. **ParallelGateway (split)** — iterates over all outgoing flows concurrently.
174+
4. **ParallelGateway (join)** — precomputes the expected incoming count; only advances when every branch has arrived.
175+
5. **EndEvent** — marks the instance as `completed` and emits `instance-completed`.
176+
177+
### Job completion loop
178+
179+
```
180+
Worker container / pod completes job
181+
→ BullMQ emits 'completed' on QueueEvents
182+
→ server handler reads workflowInstanceId from the result
183+
→ WorkflowsService.notifyJobCompleted()
184+
→ WorkflowInstance.onJobCompleted() resolves bpmnTaskId via jobToElement
185+
→ WorkflowInstance._advance(bpmnTaskId) walks to the next BPMN element
186+
```
187+
188+
### Failure handling
189+
190+
If a job fails, `QueueEvents` emits `'failed'`. The server patches the task record and calls `WorkflowsService.notifyJobFailed()`, which marks the instance as `failed`. No retry is implemented in this example.
191+
192+
## Dispatcher = lifecycle tracking
193+
194+
Because every job spawns a dedicated container/pod, the `runners` service lets you answer questions that a shared worker pool cannot:
195+
196+
- "Which container executed this specific job?" → `GET /runners?jobId=42`
197+
- "How long did that pod live?" → `createdAt` / `startedAt` / `finishedAt`
198+
- "Did the runtime exit cleanly?" → `status` + `exitCode`
199+
200+
This is the main reason the example moved away from long-running Swarm services / static K8s Deployments: **one object per job, directly observable**.
201+
202+
## Feathers services
203+
204+
| Service | Implementation | Purpose |
205+
|---------|---------------|---------|
206+
| `task-store` | `MemoryService` | BullMQ job metadata |
207+
| `workflow-store` | `MemoryService` | Workflow definitions + instances |
208+
| `tasks` | `TaskService` | BullMQ queue interface |
209+
| `workflows` | `WorkflowsService` | BPMN orchestration (`create` parses + runs) |
210+
| `runners` | `RunnersService` | Container/pod lifecycle |
211+
212+
All services publish real-time events on the `anonymous` Socket.IO channel.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"name": "feathers-bpmn-orchestration-example",
3+
"description": "BPMN-driven feathers-tasks orchestration with dockerode and Kubernetes API ephemeral workers",
4+
"packageManager": "pnpm@10.1.0",
5+
"type": "module",
6+
"scripts": {
7+
"dev:server": "node --watch --conditions development server/index.js",
8+
"worker": "node worker/run-job.js",
9+
"build:image": "docker build -t feathers-tasks-worker:latest -f Dockerfile ../..",
10+
"lint": "standard --fix"
11+
},
12+
"author": {
13+
"name": "KALISIO <contact@kalisio.com>",
14+
"url": "https://github.com/kalisio"
15+
},
16+
"license": "MIT",
17+
"dependencies": {
18+
"@feathersjs/errors": "catalog:",
19+
"@feathersjs/express": "catalog:",
20+
"@feathersjs/feathers": "catalog:",
21+
"@feathersjs/memory": "catalog:",
22+
"@feathersjs/socketio": "catalog:",
23+
"@kalisio/feathers-tasks": "workspace:*",
24+
"@kubernetes/client-node": "^1.0.0",
25+
"bpmn-moddle": "^10.0.0",
26+
"bullmq": "catalog:",
27+
"debug": "catalog:",
28+
"dockerode": "^4.0.2"
29+
}
30+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import Docker from 'dockerode'
2+
import debugLib from 'debug'
3+
4+
const debug = debugLib('dispatcher:docker')
5+
6+
export class DockerDispatcher {
7+
/**
8+
* @param {object} options
9+
* @param {string} options.image Docker image implementing worker/run-job.js
10+
* @param {string} options.queueName BullMQ queue name
11+
* @param {{host:string, port:number}} options.redis Redis coordinates *as seen from inside the container*
12+
* @param {string} [options.networkMode='bridge']
13+
* @param {object} options.runnersService Feathers service for lifecycle tracking
14+
*/
15+
constructor (options) {
16+
this.image = options.image
17+
this.queueName = options.queueName
18+
this.redis = options.redis
19+
this.networkMode = options.networkMode || 'bridge'
20+
this.runnersService = options.runnersService
21+
22+
this.docker = new Docker(options.docker || {})
23+
}
24+
25+
/**
26+
* Spawn one ephemeral container for a single job.
27+
*
28+
* @param {{ id:string, name:string }} job — BullMQ job descriptor
29+
* @returns {Promise<object>} runner record
30+
*/
31+
async dispatch (job) {
32+
const workerId = `docker-${job.id}`
33+
const name = `tasks-worker-${job.id}-${Date.now()}`
34+
35+
const runner = await this.runnersService.create({
36+
jobId: job.id,
37+
jobType: job.name,
38+
orchestrator: 'docker',
39+
containerId: null,
40+
name,
41+
status: 'starting',
42+
createdAt: new Date().toISOString()
43+
})
44+
45+
debug('Creating container %s for job %s (type %s)', name, job.id, job.name)
46+
47+
let container
48+
try {
49+
container = await this.docker.createContainer({
50+
Image: this.image,
51+
name,
52+
Env: [
53+
`REDIS_HOST=${this.redis.host}`,
54+
`REDIS_PORT=${this.redis.port}`,
55+
`QUEUE_NAME=${this.queueName}`,
56+
`JOB_TYPE=${job.name}`,
57+
`WORKER_ID=${workerId}`,
58+
'ORCHESTRATOR=docker'
59+
],
60+
HostConfig: {
61+
NetworkMode: this.networkMode,
62+
AutoRemove: false,
63+
ExtraHosts: ['host.docker.internal:host-gateway']
64+
},
65+
Labels: {
66+
'feathers-tasks.job-id': String(job.id),
67+
'feathers-tasks.job-type': job.name
68+
}
69+
})
70+
71+
await container.start()
72+
await this.runnersService.patch(runner.id, {
73+
containerId: container.id,
74+
status: 'running',
75+
startedAt: new Date().toISOString()
76+
})
77+
78+
debug('Container %s (%s) started for job %s', name, container.id, job.id)
79+
80+
this._watch(runner.id, container, name).catch(err => {
81+
console.error(`[dispatcher:docker] watch failed for ${name}:`, err.message)
82+
})
83+
84+
return { ...runner, containerId: container.id }
85+
} catch (err) {
86+
await this.runnersService.patch(runner.id, {
87+
status: 'failed',
88+
error: err.message,
89+
finishedAt: new Date().toISOString()
90+
})
91+
throw err
92+
}
93+
}
94+
95+
async _watch (runnerId, container, name) {
96+
const { StatusCode } = await container.wait()
97+
const status = StatusCode === 0 ? 'completed' : 'failed'
98+
debug('Container %s finished (exit %d)', name, StatusCode)
99+
100+
await this.runnersService.patch(runnerId, {
101+
status,
102+
exitCode: StatusCode,
103+
finishedAt: new Date().toISOString()
104+
})
105+
106+
try {
107+
await container.remove({ force: true })
108+
} catch (err) {
109+
debug('Cleanup of %s failed: %s', name, err.message)
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)