Skip to content

Commit a5d55ab

Browse files
committed
feat: add bpmn implementation to feathers tasks orchestration example
1 parent 0f80c54 commit a5d55ab

7 files changed

Lines changed: 594 additions & 16 deletions

File tree

examples/feathers-tasks-orchestration/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"@feathersjs/memory": "catalog:",
2020
"@feathersjs/socketio": "catalog:",
2121
"@kalisio/feathers-tasks": "workspace:*",
22+
"bpmn-moddle": "^10.0.0",
2223
"bullmq": "catalog:",
2324
"debug": "catalog:"
2425
}

examples/feathers-tasks-orchestration/server/index.js

Lines changed: 125 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@ import { feathers } from '@feathersjs/feathers'
22
import express from '@feathersjs/express'
33
import socketio from '@feathersjs/socketio'
44
import { MemoryService } from '@feathersjs/memory'
5-
import { TaskService, createQueue, setupQueueEvents, setupDashboard } from '@kalisio/feathers-tasks'
5+
import { TaskService, createQueue, setupDashboard } from '@kalisio/feathers-tasks'
6+
import { WorkflowsService } from './workflows.service.js'
7+
import { readFile } from 'fs/promises'
8+
import { fileURLToPath } from 'url'
9+
import { dirname, join } from 'path'
10+
11+
const __dirname = dirname(fileURLToPath(import.meta.url))
612

713
const port = Number(process.env.SERVER_PORT) || 3030
814
const redis = {
@@ -13,37 +19,146 @@ const queueName = process.env.QUEUE_NAME || 'orchestration-tasks'
1319

1420
const app = express(feathers())
1521

16-
app.use(express.json())
22+
app.use(express.json({ limit: '2mb' }))
1723
app.use(express.urlencoded({ extended: true }))
1824
app.configure(express.rest())
1925
app.configure(socketio({ cors: { origin: '*' } }))
2026

2127
app.use('task-store', new MemoryService())
28+
app.use('workflow-store', new MemoryService())
2229

2330
const queue = createQueue(queueName, redis)
2431

25-
setupQueueEvents(queueName, redis, app, 'task-store')
32+
const { QueueEvents } = await import('bullmq')
33+
const queueEvents = new QueueEvents(queueName, { connection: redis })
34+
35+
queueEvents.on('completed', async ({ jobId, returnvalue }) => {
36+
const result = returnvalue ?? null
37+
38+
// 1. Update the task-store (standard feathers-tasks behavior)
39+
try {
40+
const taskStore = app.service('task-store')
41+
const items = await taskStore.find({ query: { id: jobId } })
42+
const records = items.data || items
43+
if (records.length) {
44+
await taskStore.patch(records[0]._id || records[0].id, {
45+
status: 'completed',
46+
result,
47+
completedAt: new Date().toISOString()
48+
})
49+
}
50+
} catch (err) {
51+
console.error(`[queue-events] Failed to patch task-store for job ${jobId}:`, err.message)
52+
}
53+
54+
// 2. Notify the workflow engine so it advances to the next BPMN element
55+
const workflowInstanceId = result?.workflowInstanceId
56+
if (workflowInstanceId) {
57+
try {
58+
await app.service('workflows').notifyJobCompleted(jobId, result)
59+
} catch (err) {
60+
console.error(`[queue-events] Failed to notify workflow engine for job ${jobId}:`, err.message)
61+
}
62+
}
63+
})
64+
65+
queueEvents.on('failed', async ({ jobId, failedReason }) => {
66+
// Update task-store
67+
try {
68+
const taskStore = app.service('task-store')
69+
const items = await taskStore.find({ query: { id: jobId } })
70+
const records = items.data || items
71+
if (records.length) {
72+
const record = records[0]
73+
const workflowInstanceId = record.payload?.workflowInstanceId
74+
await taskStore.patch(record._id || record.id, {
75+
status: 'failed',
76+
error: failedReason,
77+
failedAt: new Date().toISOString()
78+
})
79+
// Notify workflow engine of failure
80+
if (workflowInstanceId) {
81+
await app.service('workflows').notifyJobFailed(jobId, failedReason, workflowInstanceId)
82+
}
83+
}
84+
} catch (err) {
85+
console.error(`[queue-events] Failed to patch task-store on failure for job ${jobId}:`, err.message)
86+
}
87+
})
88+
89+
queueEvents.on('active', async ({ jobId }) => {
90+
try {
91+
const taskStore = app.service('task-store')
92+
const items = await taskStore.find({ query: { id: jobId } })
93+
const records = items.data || items
94+
if (records.length) {
95+
await taskStore.patch(records[0]._id || records[0].id, {
96+
status: 'active',
97+
startedAt: new Date().toISOString()
98+
})
99+
}
100+
} catch { /* non-critical */ }
101+
})
102+
103+
queueEvents.on('progress', async ({ jobId, data }) => {
104+
try {
105+
const taskStore = app.service('task-store')
106+
const items = await taskStore.find({ query: { id: jobId } })
107+
const records = items.data || items
108+
if (records.length) {
109+
await taskStore.patch(records[0]._id || records[0].id, { progress: data })
110+
}
111+
} catch { /* non-critical */ }
112+
})
26113

27114
setupDashboard(app, queue, '/admin/tasks')
28115

29116
app.use('tasks', new TaskService({ queue, persistenceService: 'task-store' }))
117+
app.use('workflows', new WorkflowsService({ store: app.service('workflow-store') }))
30118

31119
app.on('connection', connection => app.channel('anonymous').join(connection))
32120
app.publish(() => app.channel('anonymous'))
33121

34122
await app.setup()
35123

36-
app.listen(port).then(() => {
124+
await Promise.all([
125+
queue.waitUntilReady(),
126+
queueEvents.waitUntilReady()
127+
])
128+
129+
app.listen(port).then(async () => {
37130
console.log(`Server listening on http://localhost:${port}`)
38131
console.log(`Bull Board: http://localhost:${port}/admin/tasks`)
39132
console.log(`Redis: ${redis.host}:${redis.port}`)
40133
console.log(`Queue: ${queueName}`)
41134
console.log()
42-
console.log('Submit a swarm job:')
43-
console.log(` curl -X POST http://localhost:${port}/tasks -H 'Content-Type: application/json' \\`)
44-
console.log(' -d \'{"type":"swarm-job","payload":{"label":"hello from swarm","steps":4}}\'')
135+
console.log('── API ──────────────────────────────────────────────────────')
136+
console.log()
137+
console.log('Launch the example BPMN workflow (auto-detects example.bpmn):')
138+
console.log(` curl -X POST http://localhost:${port}/workflows \\`)
139+
console.log(' -H \'Content-Type: application/json\' \\')
140+
console.log(' -d \'{"name":"Example workflow","bpmnFile":"./workflows/example.bpmn"}\'')
45141
console.log()
46-
console.log('Submit a k8s job:')
47-
console.log(` curl -X POST http://localhost:${port}/tasks -H 'Content-Type: application/json' \\`)
48-
console.log(' -d \'{"type":"k8s-job","payload":{"label":"hello from k8s","steps":3}}\'')
142+
console.log('Or submit a standalone job (no BPMN):')
143+
console.log(` curl -X POST http://localhost:${port}/tasks \\`)
144+
console.log(' -H \'Content-Type: application/json\' \\')
145+
console.log(' -d \'{"type":"swarm-job","payload":{"label":"manual job","steps":2}}\'')
146+
console.log()
147+
console.log('List workflow instances:')
148+
console.log(` curl http://localhost:${port}/workflows`)
149+
console.log()
150+
console.log('List all tasks:')
151+
console.log(` curl http://localhost:${port}/tasks`)
152+
console.log()
153+
154+
if (process.env.AUTORUN) {
155+
console.log('── AUTORUN — launching example workflow ──────────────────────')
156+
const bpmnFile = join(__dirname, '..', 'workflows', 'example.bpmn')
157+
const bpmnXml = await readFile(bpmnFile, 'utf-8')
158+
const wf = await app.service('workflows').create({
159+
name: 'Example workflow (auto)',
160+
bpmnXml
161+
})
162+
console.log(`Workflow launched — id: ${wf.id}, instance: ${wf.instances[0]?.instanceId}`)
163+
}
49164
})

0 commit comments

Comments
 (0)