Skip to content

Commit 75a0c11

Browse files
authored
Merge pull request #59 from Mrwicks00/feature/durable-job-queue-bullmq
feat: migrate auto-rebalancer and analytics to durable job queue (fix…
2 parents b1cd223 + 9b6eda6 commit 75a0c11

File tree

16 files changed

+1486
-544
lines changed

16 files changed

+1486
-544
lines changed

backend/.env.example

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ MIN_REBALANCE_INTERVAL=86400000
6868
# Maximum auto-rebalances per day per portfolio
6969
MAX_AUTO_REBALANCES_PER_DAY=3
7070

71+
# ============================================
72+
# JOB QUEUE CONFIGURATION (BullMQ + Redis)
73+
# ============================================
74+
75+
# Redis connection URL for BullMQ durable job queue (issue #38)
76+
# Format: redis://[user:password@]host[:port][/db]
77+
# Defaults to redis://localhost:6379 if not set
78+
# Required for production – when unset, queue-backed scheduling is skipped gracefully
79+
REDIS_URL=redis://localhost:6379
80+
7181
# ============================================
7282
# CIRCUIT BREAKER CONFIGURATION
7383
# ============================================

backend/package-lock.json

Lines changed: 315 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
"dependencies": {
1515
"@stellar/stellar-sdk": "^12.0.1",
1616
"better-sqlite3": "^12.6.2",
17+
"bullmq": "^5.69.3",
1718
"cors": "^2.8.5",
1819
"dotenv": "^16.3.1",
1920
"express": "^4.18.2",
2021
"express-rate-limit": "^7.4.1",
22+
"ioredis": "^5.9.3",
2123
"node-cron": "^3.0.3",
2224
"nodemailer": "^8.0.1",
2325
"pg": "^8.11.3",
@@ -31,9 +33,9 @@
3133
"@types/node-cron": "^3.0.11",
3234
"@types/nodemailer": "^7.0.10",
3335
"@types/pg": "^8.10.9",
36+
"@types/supertest": "^2.0.12",
3437
"@types/ws": "^8.18.1",
3538
"supertest": "^6.3.3",
36-
"@types/supertest": "^2.0.12",
3739
"tsx": "^4.1.4",
3840
"typescript": "^5.2.2",
3941
"vitest": "^4.0.18"

backend/src/api/routes.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { logger } from '../utils/logger.js'
1111
import { requireAdmin } from '../middleware/auth.js'
1212
import { blockDebugInProduction } from '../middleware/debugGate.js'
1313
import { writeRateLimiter } from '../middleware/rateLimit.js'
14+
import { getQueueMetrics } from '../queue/queueMetrics.js'
1415

1516
const router = Router()
1617
const stellarService = new StellarService()
@@ -1416,4 +1417,32 @@ router.get('/debug/auto-rebalancer-test', blockDebugInProduction, async (req, re
14161417
}
14171418
})
14181419

1420+
// ================================
1421+
// QUEUE HEALTH ROUTE
1422+
// ================================
1423+
1424+
/**
1425+
* GET /api/queue/health
1426+
* Returns BullMQ queue depths and Redis connectivity status.
1427+
* Used for worker health monitoring and alerting (issue #38).
1428+
*/
1429+
router.get('/queue/health', async (req, res) => {
1430+
try {
1431+
const metrics = await getQueueMetrics()
1432+
const httpStatus = metrics.redisConnected ? 200 : 503
1433+
res.status(httpStatus).json({
1434+
success: metrics.redisConnected,
1435+
...metrics,
1436+
timestamp: new Date().toISOString(),
1437+
})
1438+
} catch (error) {
1439+
res.status(500).json({
1440+
success: false,
1441+
error: getErrorMessage(error),
1442+
redisConnected: false,
1443+
timestamp: new Date().toISOString(),
1444+
})
1445+
}
1446+
})
1447+
14191448
export { router as portfolioRouter }

backend/src/index.ts

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ import { RebalancingService } from './monitoring/rebalancer.js'
1010
import { AutoRebalancerService } from './services/autoRebalancer.js'
1111
import { logger } from './utils/logger.js'
1212
import { databaseService } from './services/databaseService.js'
13+
import { isRedisAvailable, logQueueStartup } from './queue/connection.js'
14+
import { startQueueScheduler } from './queue/scheduler.js'
15+
import { startPortfolioCheckWorker, stopPortfolioCheckWorker } from './queue/workers/portfolioCheckWorker.js'
16+
import { startRebalanceWorker, stopRebalanceWorker } from './queue/workers/rebalanceWorker.js'
17+
import { startAnalyticsSnapshotWorker, stopAnalyticsSnapshotWorker } from './queue/workers/analyticsSnapshotWorker.js'
18+
import { closeAllQueues } from './queue/queues.js'
1319

1420
const app = express()
1521
const port = process.env.PORT || 3001
@@ -62,6 +68,9 @@ app.use((req, res, next) => {
6268

6369
app.use(globalRateLimiter)
6470

71+
// Create auto-rebalancer instance
72+
const autoRebalancer = new AutoRebalancerService()
73+
6574
// Health check endpoint
6675
app.get('/health', (req, res) => {
6776
res.json({
@@ -139,14 +148,12 @@ app.get('/', (req, res) => {
139148
health: '/health',
140149
corsTest: '/test/cors',
141150
coinGeckoTest: '/test/coingecko',
142-
autoRebalancerStatus: '/api/auto-rebalancer/status'
151+
autoRebalancerStatus: '/api/auto-rebalancer/status',
152+
queueHealth: '/api/queue/health'
143153
}
144154
})
145155
})
146156

147-
// Create auto-rebalancer instance
148-
const autoRebalancer = new AutoRebalancerService()
149-
150157
// Mount API routes
151158
app.use('/api', portfolioRouter)
152159
app.use('/', portfolioRouter)
@@ -161,7 +168,8 @@ app.use((req, res) => {
161168
availableEndpoints: {
162169
health: '/health',
163170
api: '/api/*',
164-
autoRebalancer: '/api/auto-rebalancer/*'
171+
autoRebalancer: '/api/auto-rebalancer/*',
172+
queueHealth: '/api/queue/health'
165173
}
166174
})
167175
})
@@ -194,30 +202,49 @@ wss.on('connection', (ws) => {
194202
})
195203
})
196204

197-
// Start existing rebalancing service
205+
// Start existing rebalancing service (now queue-backed, no cron)
198206
try {
199207
const rebalancingService = new RebalancingService(wss)
200208
rebalancingService.start()
201-
console.log('[LEGACY-REBALANCER] Legacy rebalancing service started')
209+
console.log('[REBALANCING-SERVICE] Monitoring service started (queue-backed)')
202210
} catch (error) {
203-
console.error('Failed to start legacy rebalancing service:', error)
211+
console.error('Failed to start rebalancing service:', error)
204212
}
205213

206214
// Start server
207-
server.listen(port, () => {
215+
server.listen(port, async () => {
208216
console.log(`🚀 Server running on port ${port}`)
209217
console.log(`Environment: ${process.env.NODE_ENV || 'development'}`)
210218
console.log(`CoinGecko API Key: ${!!process.env.COINGECKO_API_KEY ? 'SET' : 'NOT SET'}`)
211219

212-
// Start automatic rebalancing service
220+
// ── BullMQ / Redis setup ────────────────────────────────────────────────
221+
const redisAvailable = await isRedisAvailable()
222+
logQueueStartup(redisAvailable)
223+
224+
if (redisAvailable) {
225+
// Start all three workers
226+
startPortfolioCheckWorker()
227+
startRebalanceWorker()
228+
startAnalyticsSnapshotWorker()
229+
230+
// Register repeatable jobs (scheduler)
231+
try {
232+
await startQueueScheduler()
233+
console.log('[SCHEDULER] ✅ Queue scheduler registered')
234+
} catch (err) {
235+
console.error('[SCHEDULER] ❌ Failed to register scheduler:', err)
236+
}
237+
}
238+
239+
// ── Auto-rebalancer (queue-backed) ──────────────────────────────────────
213240
const shouldStartAutoRebalancer =
214241
process.env.NODE_ENV === 'production' ||
215242
process.env.ENABLE_AUTO_REBALANCER === 'true'
216243

217244
if (shouldStartAutoRebalancer) {
218245
try {
219246
console.log('[AUTO-REBALANCER] Starting automatic rebalancing service...')
220-
autoRebalancer.start()
247+
await autoRebalancer.start()
221248
console.log('[AUTO-REBALANCER] ✅ Automatic rebalancing service started successfully')
222249

223250
// Broadcast to WebSocket clients
@@ -243,10 +270,11 @@ server.listen(port, () => {
243270
console.log(` CORS Test: http://localhost:${port}/test/cors`)
244271
console.log(` CoinGecko Test: http://localhost:${port}/test/coingecko`)
245272
console.log(` Auto-Rebalancer Status: http://localhost:${port}/api/auto-rebalancer/status`)
273+
console.log(` Queue Health: http://localhost:${port}/api/queue/health`)
246274
})
247275

248276
// Graceful shutdown
249-
const gracefulShutdown = (signal: string) => {
277+
const gracefulShutdown = async (signal: string) => {
250278
console.log(`\n[SHUTDOWN] ${signal} received, shutting down gracefully...`)
251279

252280
// Stop auto-rebalancer
@@ -257,6 +285,26 @@ const gracefulShutdown = (signal: string) => {
257285
console.error('[SHUTDOWN] Error stopping auto-rebalancer:', error)
258286
}
259287

288+
// Stop BullMQ workers
289+
try {
290+
await Promise.all([
291+
stopPortfolioCheckWorker(),
292+
stopRebalanceWorker(),
293+
stopAnalyticsSnapshotWorker(),
294+
])
295+
console.log('[SHUTDOWN] BullMQ workers stopped')
296+
} catch (error) {
297+
console.error('[SHUTDOWN] Error stopping BullMQ workers:', error)
298+
}
299+
300+
// Close BullMQ queues
301+
try {
302+
await closeAllQueues()
303+
console.log('[SHUTDOWN] BullMQ queues closed')
304+
} catch (error) {
305+
console.error('[SHUTDOWN] Error closing queues:', error)
306+
}
307+
260308
// Close database connection
261309
try {
262310
databaseService.close()

0 commit comments

Comments
 (0)