Skip to content

Commit 44b08f5

Browse files
authored
Merge pull request #78 from Adedayo-Data/main
Rebalance Concurrency Lock & API Fixes
2 parents ce93a4d + f668df5 commit 44b08f5

File tree

3 files changed

+296
-68
lines changed

3 files changed

+296
-68
lines changed

backend/src/api/routes.ts

Lines changed: 112 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,20 @@ router.get('/rebalance/history', async (req: Request, res: Response) => {
4747
const portfolioId = req.query.portfolioId as string
4848
const limit = parseInt(req.query.limit as string) || 50
4949
const source = parseHistorySource(req.query.source)
50-
const startTimestamp = parseOptionalTimestamp(req.query.startTimestamp)
51-
const endTimestamp = parseOptionalTimestamp(req.query.endTimestamp)
52-
const syncOnChain = parseOptionalBoolean(req.query.syncOnChain) === true
53-
54-
logger.info('Rebalance history request', { portfolioId: portfolioId || 'all' })
55-
if (syncOnChain) {
56-
await contractEventIndexerService.syncOnce()
57-
}
50+
const startTimestamp = parseOptionalTimestamp(req.query.startTimestamp)
51+
const endTimestamp = parseOptionalTimestamp(req.query.endTimestamp)
52+
const syncOnChain = parseOptionalBoolean(req.query.syncOnChain) === true
53+
54+
logger.info('Rebalance history request', { portfolioId: portfolioId || 'all' })
55+
if (syncOnChain) {
56+
await contractEventIndexerService.syncOnce()
57+
}
5858

5959
const history = await rebalanceHistoryService.getRebalanceHistory(
6060
portfolioId || undefined,
6161
limit,
6262
{
63-
eventSource: source,
63+
eventSource: source === 'all' ? undefined : source,
6464
startTimestamp,
6565
endTimestamp
6666
}
@@ -79,12 +79,12 @@ router.get('/rebalance/history', async (req: Request, res: Response) => {
7979
})
8080

8181
} catch (error) {
82-
logger.error('[ERROR] Rebalance history failed', { error: getErrorObject(error) })
83-
res.json({
84-
success: false,
85-
error: getErrorMessage(error),
86-
history: []
87-
})
82+
logger.error('[ERROR] Rebalance history failed', { error: getErrorObject(error) })
83+
res.json({
84+
success: false,
85+
error: getErrorMessage(error),
86+
history: []
87+
})
8888
}
8989
})
9090

@@ -93,7 +93,7 @@ router.post('/rebalance/history', idempotencyMiddleware, async (req: Request, re
9393
try {
9494
const eventData = req.body
9595

96-
logger.info('Recording new rebalance event', { eventData })
96+
logger.info('Recording new rebalance event', { eventData })
9797

9898
const event = await rebalanceHistoryService.recordRebalanceEvent({
9999
...eventData,
@@ -106,13 +106,13 @@ router.post('/rebalance/history', idempotencyMiddleware, async (req: Request, re
106106
timestamp: new Date().toISOString()
107107
})
108108
} catch (error) {
109-
logger.error('[ERROR] Failed to record rebalance event', { error: getErrorObject(error) })
110-
res.status(500).json({
111-
success: false,
112-
error: getErrorMessage(error)
113-
})
114-
}
115-
})
109+
logger.error('[ERROR] Failed to record rebalance event', { error: getErrorObject(error) })
110+
res.status(500).json({
111+
success: false,
112+
error: getErrorMessage(error)
113+
})
114+
}
115+
})
116116

117117
router.post('/rebalance/history/sync-onchain', requireAdmin, async (req: Request, res: Response) => {
118118
try {
@@ -131,6 +131,55 @@ router.post('/rebalance/history/sync-onchain', requireAdmin, async (req: Request
131131
}
132132
})
133133

134+
// Manual portfolio rebalance
135+
router.post('/portfolio/:id/rebalance', writeRateLimiter, idempotencyMiddleware, async (req: Request, res: Response) => {
136+
try {
137+
const portfolioId = req.params.id;
138+
139+
console.log(`[INFO] Attempting manual rebalance for portfolio: ${portfolioId}`);
140+
141+
// Try to acquire lock
142+
const lockAcquired = await rebalanceLockService.acquireLock(portfolioId);
143+
if (!lockAcquired) {
144+
console.log(`[WARNING] Rebalance already in progress for portfolio: ${portfolioId}`);
145+
return res.status(409).json({
146+
success: false,
147+
error: 'Rebalance already in progress for this portfolio'
148+
});
149+
}
150+
151+
try {
152+
const portfolio = await stellarService.getPortfolio(portfolioId);
153+
const prices = await reflectorService.getCurrentPrices();
154+
const riskCheck = riskManagementService.shouldAllowRebalance(portfolio as unknown as Portfolio, prices);
155+
156+
if (!riskCheck.allowed) {
157+
return res.status(400).json({
158+
success: false,
159+
error: riskCheck.reason,
160+
alerts: riskCheck.alerts
161+
});
162+
}
163+
164+
const result = await stellarService.executeRebalance(portfolioId);
165+
166+
res.json({
167+
success: true,
168+
result,
169+
timestamp: new Date().toISOString()
170+
});
171+
} finally {
172+
await rebalanceLockService.releaseLock(portfolioId);
173+
}
174+
} catch (error) {
175+
console.error('[ERROR] Manual rebalance failed:', error);
176+
res.status(500).json({
177+
success: false,
178+
error: getErrorMessage(error)
179+
});
180+
}
181+
});
182+
134183
// ================================
135184
// RISK MANAGEMENT ROUTES
136185
// ================================
@@ -140,7 +189,7 @@ router.get('/risk/metrics/:portfolioId', async (req: Request, res: Response) =>
140189
try {
141190
const { portfolioId } = req.params
142191

143-
logger.info('Calculating risk metrics for portfolio', { portfolioId })
192+
logger.info('Calculating risk metrics for portfolio', { portfolioId })
144193

145194
const portfolio = await stellarService.getPortfolio(portfolioId)
146195
const prices = await reflectorService.getCurrentPrices()
@@ -167,11 +216,11 @@ router.get('/risk/metrics/:portfolioId', async (req: Request, res: Response) =>
167216
timestamp: new Date().toISOString()
168217
})
169218
} catch (error) {
170-
logger.error('[ERROR] Failed to get risk metrics', { error: getErrorObject(error) })
171-
res.status(500).json({
172-
success: false,
173-
error: getErrorMessage(error),
174-
riskMetrics: {
219+
logger.error('[ERROR] Failed to get risk metrics', { error: getErrorObject(error) })
220+
res.status(500).json({
221+
success: false,
222+
error: getErrorMessage(error),
223+
riskMetrics: {
175224
volatility: 0,
176225
concentrationRisk: 0,
177226
liquidityRisk: 0,
@@ -187,12 +236,12 @@ router.get('/risk/check/:portfolioId', async (req: Request, res: Response) => {
187236
try {
188237
const { portfolioId } = req.params
189238

190-
logger.info('Checking risk conditions for portfolio', { portfolioId })
239+
logger.info('Checking risk conditions for portfolio', { portfolioId })
191240

192241
const portfolio = await stellarService.getPortfolio(portfolioId)
193242
const prices = await reflectorService.getCurrentPrices()
194243

195-
const riskCheck = riskManagementService.shouldAllowRebalance(portfolio, prices)
244+
const riskCheck = riskManagementService.shouldAllowRebalance(portfolio as unknown as Portfolio, prices)
196245

197246
res.json({
198247
success: true,
@@ -201,10 +250,10 @@ router.get('/risk/check/:portfolioId', async (req: Request, res: Response) => {
201250
timestamp: new Date().toISOString()
202251
})
203252
} catch (error) {
204-
logger.error('[ERROR] Failed to check risk conditions', { error: getErrorObject(error) })
205-
res.status(500).json({
206-
success: false,
207-
allowed: false,
253+
logger.error('[ERROR] Failed to check risk conditions', { error: getErrorObject(error) })
254+
res.status(500).json({
255+
success: false,
256+
allowed: false,
208257
reason: 'Failed to assess risk conditions',
209258
alerts: [],
210259
error: getErrorMessage(error)
@@ -219,16 +268,16 @@ router.get('/risk/check/:portfolioId', async (req: Request, res: Response) => {
219268
// Get current prices - FIXED to return direct format for frontend
220269
router.get('/prices', async (req: Request, res: Response) => {
221270
try {
222-
logger.info('[DEBUG] Fetching prices for frontend...')
223-
const prices = await reflectorService.getCurrentPrices()
224-
225-
logger.info('[DEBUG] Raw prices from service', { prices })
271+
logger.info('[DEBUG] Fetching prices for frontend...')
272+
const prices = await reflectorService.getCurrentPrices()
273+
274+
logger.info('[DEBUG] Raw prices from service', { prices })
226275

227276
// Return prices directly in the format frontend expects
228277
res.json(prices)
229278

230279
} catch (error) {
231-
logger.error('[ERROR] Prices endpoint failed', { error: getErrorObject(error) })
280+
logger.error('[ERROR] Prices endpoint failed', { error: getErrorObject(error) })
232281

233282
if (!featureFlags.allowFallbackPrices) {
234283
return res.status(503).json({
@@ -245,15 +294,15 @@ router.get('/prices', async (req: Request, res: Response) => {
245294
USDC: { price: 0.999781, change: -0.002, timestamp: Date.now() / 1000, source: 'fallback' }
246295
}
247296

248-
logger.info('[DEBUG] Sending fallback prices', { fallbackPrices })
249-
res.json(fallbackPrices)
250-
}
251-
})
297+
logger.info('[DEBUG] Sending fallback prices', { fallbackPrices })
298+
res.json(fallbackPrices)
299+
}
300+
})
252301

253302
// Enhanced prices endpoint with risk analysis
254303
router.get('/prices/enhanced', async (req: Request, res: Response) => {
255304
try {
256-
logger.info('[INFO] Fetching enhanced prices with risk analysis')
305+
logger.info('[INFO] Fetching enhanced prices with risk analysis')
257306

258307
const prices = await reflectorService.getCurrentPrices()
259308

@@ -282,9 +331,9 @@ router.get('/prices/enhanced', async (req: Request, res: Response) => {
282331
timestamp: new Date().toISOString()
283332
})
284333
} catch (error) {
285-
logger.error('[ERROR] Failed to fetch enhanced prices', { error: getErrorObject(error) })
286-
res.status(500).json({
287-
success: false,
334+
logger.error('[ERROR] Failed to fetch enhanced prices', { error: getErrorObject(error) })
335+
res.status(500).json({
336+
success: false,
288337
error: getErrorMessage(error),
289338
prices: {},
290339
riskAlerts: [],
@@ -493,7 +542,7 @@ router.get('/system/status', async (req: Request, res: Response) => {
493542
success: true,
494543
system: {
495544
status: priceSourcesHealthy ? 'operational' : 'degraded',
496-
uptime: process.uptime(),
545+
uptime: global.process.uptime(),
497546
timestamp: new Date().toISOString(),
498547
version: '1.0.0'
499548
},
@@ -524,9 +573,9 @@ router.get('/system/status', async (req: Request, res: Response) => {
524573
featureFlags: publicFeatureFlags
525574
})
526575
} catch (error) {
527-
logger.error('[ERROR] Failed to get system status', { error: getErrorObject(error) })
528-
res.status(500).json({
529-
success: false,
576+
logger.error('[ERROR] Failed to get system status', { error: getErrorObject(error) })
577+
res.status(500).json({
578+
success: false,
530579
error: getErrorMessage(error),
531580
system: { status: 'error' }
532581
})
@@ -952,8 +1001,7 @@ router.delete('/notifications/unsubscribe', async (req: Request, res: Response)
9521001

9531002
router.get('/debug/coingecko-test', blockDebugInProduction, async (req: Request, res: Response) => {
9541003
try {
955-
const apiKey = process.env.COINGECKO_API_KEY
956-
logger.info('[DEBUG] API Key exists', { apiKeySet: !!apiKey })
1004+
9571005

9581006
// Test direct API call
9591007
const testUrl = apiKey ?
@@ -969,7 +1017,7 @@ router.get('/debug/coingecko-test', blockDebugInProduction, async (req: Request,
9691017
headers['x-cg-pro-api-key'] = apiKey
9701018
}
9711019

972-
logger.info('[DEBUG] Test URL', { testUrl })
1020+
logger.info('[DEBUG] Test URL', { testUrl })
9731021

9741022
const response = await fetch(testUrl, { headers })
9751023
const data = await response.json()
@@ -992,7 +1040,7 @@ router.get('/debug/coingecko-test', blockDebugInProduction, async (req: Request,
9921040

9931041
router.get('/debug/force-fresh-prices', blockDebugInProduction, async (req: Request, res: Response) => {
9941042
try {
995-
logger.info('[DEBUG] Clearing cache and forcing fresh prices...')
1043+
logger.info('[DEBUG] Clearing cache and forcing fresh prices...')
9961044

9971045
// Clear cache first
9981046
reflectorService.clearCache()
@@ -1021,7 +1069,7 @@ router.get('/debug/force-fresh-prices', blockDebugInProduction, async (req: Requ
10211069

10221070
router.get('/debug/reflector-test', blockDebugInProduction, async (req: Request, res: Response) => {
10231071
try {
1024-
logger.info('[DEBUG] Testing reflector service...')
1072+
logger.info('[DEBUG] Testing reflector service...')
10251073

10261074
const testResult = await reflectorService.testApiConnectivity()
10271075
const cacheStatus = reflectorService.getCacheStatus()
@@ -1031,9 +1079,9 @@ router.get('/debug/reflector-test', blockDebugInProduction, async (req: Request,
10311079
apiConnectivityTest: testResult,
10321080
cacheStatus,
10331081
environment: {
1034-
nodeEnv: process.env.NODE_ENV,
1035-
apiKeySet: !!process.env.COINGECKO_API_KEY,
1036-
apiKeyLength: process.env.COINGECKO_API_KEY?.length || 0
1082+
nodeEnv: global.process.env.NODE_ENV,
1083+
apiKeySet: !!global.process.env.COINGECKO_API_KEY,
1084+
apiKeyLength: global.process.env.COINGECKO_API_KEY?.length || 0
10371085
},
10381086
timestamp: new Date().toISOString()
10391087
})
@@ -1049,12 +1097,12 @@ router.get('/debug/reflector-test', blockDebugInProduction, async (req: Request,
10491097
router.get('/debug/env', blockDebugInProduction, async (req: Request, res: Response) => {
10501098
try {
10511099
res.json({
1052-
environment: process.env.NODE_ENV,
1053-
apiKeySet: !!process.env.COINGECKO_API_KEY,
1100+
environment: global.process.env.NODE_ENV,
1101+
apiKeySet: !!global.process.env.COINGECKO_API_KEY,
10541102
autoRebalancerEnabled: !!autoRebalancer,
10551103
autoRebalancerRunning: autoRebalancer ? autoRebalancer.getStatus().isRunning : false,
1056-
enableAutoRebalancer: process.env.ENABLE_AUTO_REBALANCER,
1057-
port: process.env.PORT,
1104+
enableAutoRebalancer: global.process.env.ENABLE_AUTO_REBALANCER,
1105+
port: global.process.env.PORT,
10581106
timestamp: new Date().toISOString()
10591107
})
10601108
} catch (error) {
@@ -1064,7 +1112,6 @@ router.get('/debug/env', blockDebugInProduction, async (req: Request, res: Respo
10641112
})
10651113
}
10661114
})
1067-
10681115
router.get('/debug/auto-rebalancer-test', blockDebugInProduction, async (req: Request, res: Response) => {
10691116
try {
10701117
if (!autoRebalancer) {

backend/src/queue/workers/rebalanceWorker.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { getConnectionOptions } from '../connection.js'
33
import { StellarService } from '../../services/stellar.js'
44
import { rebalanceHistoryService } from '../../services/serviceContainer.js'
55
import { notificationService } from '../../services/notificationService.js'
6-
import { logger, logAudit } from '../../utils/logger.js'
6+
77
import type { RebalanceJobData } from '../queues.js'
88

99
let worker: Worker | null = null
@@ -29,6 +29,14 @@ export async function processRebalanceJob(
2929
})
3030
}
3131

32+
// Try to acquire the concurrency lock
33+
const lockAcquired = await rebalanceLockService.acquireLock(portfolioId)
34+
35+
if (!lockAcquired) {
36+
logger.info('[WORKER:rebalance] Rebalance already in progress. Aborting.', { portfolioId })
37+
return // Gracefully skip execution
38+
}
39+
3240
const stellarService = new StellarService()
3341
try {
3442
const portfolio = await stellarService.getPortfolio(portfolioId)
@@ -112,6 +120,9 @@ export async function processRebalanceJob(
112120

113121
// Re-throw so BullMQ can retry with backoff
114122
throw err
123+
} finally {
124+
// Always release the lock to prevent deadlocks
125+
await rebalanceLockService.releaseLock(portfolioId)
115126
}
116127
}
117128

@@ -137,14 +148,14 @@ export function startRebalanceWorker(): Worker | null {
137148
return null
138149
}
139150

140-
worker.on('completed', (job) => {
151+
worker.on('completed', (job: Job) => {
141152
logger.info('[WORKER:rebalance] Job completed', {
142153
jobId: job.id,
143154
portfolioId: job.data.portfolioId,
144155
})
145156
})
146157

147-
worker.on('failed', (job, err) => {
158+
worker.on('failed', (job: Job | undefined, err: Error) => {
148159
logger.error('[WORKER:rebalance] Job failed', {
149160
jobId: job?.id,
150161
portfolioId: job?.data.portfolioId,

0 commit comments

Comments
 (0)