Skip to content

Commit e6dbedc

Browse files
JamesVictor-Oclaude
andcommitted
feat(concurrency): add optimistic concurrency control to portfolio updates
Add a version field to portfolio records and implement compare-and-set semantics for all writes to prevent silent lost updates during concurrent rebalance/create/update flows. Changes: - Add `version INTEGER NOT NULL DEFAULT 1` to portfolios table (SQLite schema, PostgreSQL schema.sql, and auto-migration for existing DBs) - Export `ConflictError` from types/index.ts with `currentVersion` payload - `DatabaseService.updatePortfolio` accepts optional `expectedVersion`; uses `WHERE id = ? AND version = ?` + `version = version + 1` atomically, throws `ConflictError` on mismatch - PostgreSQL `dbUpdatePortfolio` mirrors the same compare-and-set logic - `StellarService.executeRebalance` passes `portfolio.version` as expectedVersion and re-throws `ConflictError` unwrapped - Rebalance route catches `ConflictError` and returns 409 Conflict with `currentVersion` in the response body - All portfolio GET responses include `version` transparently - 7 new unit tests: version init, increment, CAS success, stale-version rejection, conflict payload, lost-update simulation, not-found path Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 211ac96 commit e6dbedc

File tree

7 files changed

+268
-32
lines changed

7 files changed

+268
-32
lines changed

backend/src/api/routes.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { logger } from '../utils/logger.js'
1111
import { requireAdmin } from '../middleware/auth.js'
1212
import { blockDebugInProduction } from '../middleware/debugGate.js'
1313
import { writeRateLimiter } from '../middleware/rateLimit.js'
14+
import { ConflictError } from '../types/index.js'
1415

1516
const router = Router()
1617
const stellarService = new StellarService()
@@ -282,6 +283,16 @@ router.post('/portfolio/:id/rebalance', writeRateLimiter, async (req, res) => {
282283
riskAlerts: riskCheck.alerts
283284
})
284285
} catch (error) {
286+
if (error instanceof ConflictError) {
287+
logger.warn('Rebalance rejected — concurrent modification detected', {
288+
portfolioId: req.params.id,
289+
currentVersion: error.currentVersion
290+
})
291+
return res.status(409).json({
292+
error: 'Portfolio was modified by a concurrent request. Fetch the latest version and retry.',
293+
currentVersion: error.currentVersion
294+
})
295+
}
285296
logger.error('Rebalance failed', { error: getErrorObject(error), portfolioId: req.params.id })
286297
res.status(500).json({
287298
error: getErrorMessage(error),

backend/src/db/portfolioDb.ts

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { query } from './client.js'
2+
import { ConflictError } from '../types/index.js'
23

34
export interface PortfolioRow {
45
id: string
@@ -9,6 +10,7 @@ export interface PortfolioRow {
910
total_value: number
1011
created_at: Date
1112
last_rebalance: Date
13+
version: number
1214
}
1315

1416
function rowToPortfolio(r: PortfolioRow) {
@@ -20,7 +22,8 @@ function rowToPortfolio(r: PortfolioRow) {
2022
balances: r.balances || {},
2123
totalValue: Number(r.total_value),
2224
createdAt: r.created_at.toISOString(),
23-
lastRebalance: r.last_rebalance.toISOString()
25+
lastRebalance: r.last_rebalance.toISOString(),
26+
version: r.version ?? 1
2427
}
2528
}
2629

@@ -33,8 +36,8 @@ export async function dbCreatePortfolio(
3336
totalValue: number
3437
) {
3538
await query(
36-
`INSERT INTO portfolios (id, user_address, allocations, threshold, balances, total_value, created_at, last_rebalance)
37-
VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW())`,
39+
`INSERT INTO portfolios (id, user_address, allocations, threshold, balances, total_value, created_at, last_rebalance, version)
40+
VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW(), 1)`,
3841
[id, userAddress, JSON.stringify(allocations), threshold, JSON.stringify(balances), totalValue]
3942
)
4043
}
@@ -61,9 +64,23 @@ export async function dbGetAllPortfolios() {
6164
return result.rows.map(rowToPortfolio)
6265
}
6366

67+
/**
68+
* Update a portfolio record.
69+
*
70+
* When `expectedVersion` is provided the update uses compare-and-set semantics:
71+
* the row is only modified when its current version matches `expectedVersion`,
72+
* and the version counter is incremented atomically. A `ConflictError` is
73+
* thrown when the match fails, signalling that a concurrent write has already
74+
* advanced the version.
75+
*
76+
* Omitting `expectedVersion` performs an unchecked update (backward-compat)
77+
* while still incrementing the version so that subsequent versioned callers
78+
* detect the change.
79+
*/
6480
export async function dbUpdatePortfolio(
6581
id: string,
66-
updates: { balances?: Record<string, number>; totalValue?: number; lastRebalance?: string }
82+
updates: { balances?: Record<string, number>; totalValue?: number; lastRebalance?: string },
83+
expectedVersion?: number
6784
) {
6885
const sets: string[] = []
6986
const values: unknown[] = []
@@ -81,6 +98,36 @@ export async function dbUpdatePortfolio(
8198
values.push(updates.lastRebalance)
8299
}
83100
if (sets.length === 0) return false
101+
102+
// Always increment the version counter on every write
103+
sets.push(`version = version + 1`)
104+
105+
if (expectedVersion !== undefined) {
106+
// Compare-and-set: WHERE id = $n AND version = $m
107+
values.push(id)
108+
values.push(expectedVersion)
109+
const result = await query(
110+
`UPDATE portfolios SET ${sets.join(', ')} WHERE id = $${i} AND version = $${i + 1}`,
111+
values
112+
)
113+
if ((result.rowCount ?? 0) === 0) {
114+
// Distinguish not-found from conflict
115+
const check = await query<{ id: string }>(
116+
'SELECT id FROM portfolios WHERE id = $1',
117+
[id]
118+
)
119+
if (check.rows.length === 0) return false
120+
// Row exists but version didn't match — concurrent write detected
121+
const current = await query<{ version: number }>(
122+
'SELECT version FROM portfolios WHERE id = $1',
123+
[id]
124+
)
125+
throw new ConflictError(current.rows[0]?.version ?? -1)
126+
}
127+
return true
128+
}
129+
130+
// Unchecked update
84131
values.push(id)
85132
const result = await query(
86133
`UPDATE portfolios SET ${sets.join(', ')} WHERE id = $${i}`,

backend/src/db/schema.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ CREATE TABLE IF NOT EXISTS portfolios (
66
balances JSONB NOT NULL DEFAULT '{}',
77
total_value NUMERIC NOT NULL DEFAULT 0,
88
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
9-
last_rebalance TIMESTAMPTZ NOT NULL DEFAULT NOW()
9+
last_rebalance TIMESTAMPTZ NOT NULL DEFAULT NOW(),
10+
version INTEGER NOT NULL DEFAULT 1
1011
);
1112

1213
CREATE INDEX IF NOT EXISTS idx_portfolios_user ON portfolios(user_address);

backend/src/services/databaseService.ts

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import Database from 'better-sqlite3'
22
import { mkdirSync } from 'node:fs'
33
import { dirname } from 'node:path'
44
import type { RebalanceEvent } from './rebalanceHistory.js'
5+
import { ConflictError } from '../types/index.js'
56

67
// ─────────────────────────────────────────────
78
// Types (mirrored from portfolioStorage.ts)
@@ -16,6 +17,7 @@ export interface Portfolio {
1617
totalValue: number
1718
createdAt: string
1819
lastRebalance: string
20+
version: number
1921
}
2022

2123
// Raw row shape as stored in SQLite
@@ -28,6 +30,7 @@ interface PortfolioRow {
2830
total_value: number
2931
created_at: string
3032
last_rebalance: string
33+
version: number
3134
}
3235

3336
interface RebalanceHistoryRow {
@@ -60,7 +63,8 @@ CREATE TABLE IF NOT EXISTS portfolios (
6063
balances TEXT NOT NULL,
6164
total_value REAL NOT NULL DEFAULT 0,
6265
created_at TEXT NOT NULL,
63-
last_rebalance TEXT NOT NULL
66+
last_rebalance TEXT NOT NULL,
67+
version INTEGER NOT NULL DEFAULT 1
6468
);
6569
6670
CREATE TABLE IF NOT EXISTS rebalance_history (
@@ -113,8 +117,8 @@ function seedDemoData(db: Database.Database): void {
113117
const balances = { XLM: 11173.18, BTC: 0.02697, ETH: 0.68257, USDC: 1000 }
114118

115119
db.prepare(`
116-
INSERT INTO portfolios (id, user_address, allocations, threshold, balances, total_value, created_at, last_rebalance)
117-
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
120+
INSERT INTO portfolios (id, user_address, allocations, threshold, balances, total_value, created_at, last_rebalance, version)
121+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)
118122
`).run(
119123
DEMO_PORTFOLIO_ID,
120124
'DEMO-USER',
@@ -221,7 +225,8 @@ function rowToPortfolio(row: PortfolioRow): Portfolio {
221225
balances: safeJsonParse(row.balances, {}, `portfolio(${row.id}).balances`),
222226
totalValue: row.total_value,
223227
createdAt: row.created_at,
224-
lastRebalance: row.last_rebalance
228+
lastRebalance: row.last_rebalance,
229+
version: row.version ?? 1
225230
}
226231
}
227232

@@ -257,6 +262,7 @@ export class DatabaseService {
257262
mkdirSync(dirname(dbPath), { recursive: true })
258263
this.db = new Database(dbPath)
259264
this.db.exec(SCHEMA_SQL)
265+
this._migrateSchema()
260266

261267
// Seed demo data on first run (empty portfolios table)
262268
const count = (this.db.prepare('SELECT COUNT(*) as cnt FROM portfolios').get() as { cnt: number }).cnt
@@ -267,6 +273,14 @@ export class DatabaseService {
267273
console.log(`[DB] SQLite database ready at: ${dbPath}`)
268274
}
269275

276+
private _migrateSchema(): void {
277+
const cols = this.db.prepare("PRAGMA table_info(portfolios)").all() as Array<{ name: string }>
278+
if (!cols.some(c => c.name === 'version')) {
279+
this.db.exec("ALTER TABLE portfolios ADD COLUMN version INTEGER NOT NULL DEFAULT 1")
280+
console.log('[DB] Migration: added version column to portfolios')
281+
}
282+
}
283+
270284
// ── Public accessor for backward-compat (routes use portfolioStorage.portfolios.size) ──
271285
get portfolios(): { size: number } {
272286
return { size: this.getPortfolioCount() }
@@ -285,8 +299,8 @@ export class DatabaseService {
285299
const id = generateId()
286300
const now = new Date().toISOString()
287301
this.db.prepare(`
288-
INSERT INTO portfolios (id, user_address, allocations, threshold, balances, total_value, created_at, last_rebalance)
289-
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
302+
INSERT INTO portfolios (id, user_address, allocations, threshold, balances, total_value, created_at, last_rebalance, version)
303+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)
290304
`).run(id, userAddress, JSON.stringify(allocations), threshold, JSON.stringify({}), 0, now, now)
291305
return id
292306
} catch (err) {
@@ -305,8 +319,8 @@ export class DatabaseService {
305319
const now = new Date().toISOString()
306320
const totalValue = Object.values(currentBalances).reduce((sum, bal) => sum + bal, 0)
307321
this.db.prepare(`
308-
INSERT INTO portfolios (id, user_address, allocations, threshold, balances, total_value, created_at, last_rebalance)
309-
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
322+
INSERT INTO portfolios (id, user_address, allocations, threshold, balances, total_value, created_at, last_rebalance, version)
323+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1)
310324
`).run(id, userAddress, JSON.stringify(allocations), threshold, JSON.stringify(currentBalances), totalValue, now, now)
311325
return id
312326
} catch (err) {
@@ -332,30 +346,73 @@ export class DatabaseService {
332346
}
333347
}
334348

335-
updatePortfolio(id: string, updates: Partial<Portfolio>): boolean {
349+
/**
350+
* Update a portfolio record.
351+
*
352+
* When `expectedVersion` is supplied the update uses compare-and-set
353+
* semantics: the row is only written when its stored version matches
354+
* `expectedVersion`, and the version counter is incremented atomically.
355+
* A `ConflictError` is thrown when the match fails, signalling that a
356+
* concurrent write has already advanced the version ahead of the caller.
357+
*
358+
* Omitting `expectedVersion` performs an unchecked update (backward
359+
* compatible) while still incrementing the version so that any subsequent
360+
* versioned callers detect the change.
361+
*/
362+
updatePortfolio(id: string, updates: Partial<Portfolio>, expectedVersion?: number): boolean {
336363
try {
337364
const row = this.db.prepare<[string], PortfolioRow>('SELECT * FROM portfolios WHERE id = ?').get(id)
338365
if (!row) return false
339366

340367
const current = rowToPortfolio(row)
341368
const merged = { ...current, ...updates }
342369

343-
this.db.prepare(`
344-
UPDATE portfolios
345-
SET user_address = ?, allocations = ?, threshold = ?, balances = ?,
346-
total_value = ?, last_rebalance = ?
347-
WHERE id = ?
348-
`).run(
349-
merged.userAddress,
350-
JSON.stringify(merged.allocations),
351-
merged.threshold,
352-
JSON.stringify(merged.balances),
353-
merged.totalValue,
354-
merged.lastRebalance,
355-
id
356-
)
370+
if (expectedVersion !== undefined) {
371+
// Compare-and-set: only update when version matches
372+
const result = this.db.prepare(`
373+
UPDATE portfolios
374+
SET user_address = ?, allocations = ?, threshold = ?, balances = ?,
375+
total_value = ?, last_rebalance = ?, version = version + 1
376+
WHERE id = ? AND version = ?
377+
`).run(
378+
merged.userAddress,
379+
JSON.stringify(merged.allocations),
380+
merged.threshold,
381+
JSON.stringify(merged.balances),
382+
merged.totalValue,
383+
merged.lastRebalance,
384+
id,
385+
expectedVersion
386+
)
387+
388+
if (result.changes === 0) {
389+
// Row exists but version didn't match — concurrent write detected
390+
const currentRow = this.db.prepare<[string], { version: number }>(
391+
'SELECT version FROM portfolios WHERE id = ?'
392+
).get(id)
393+
throw new ConflictError(currentRow?.version ?? -1)
394+
}
395+
} else {
396+
// Unchecked update — still increment version for future versioned callers
397+
this.db.prepare(`
398+
UPDATE portfolios
399+
SET user_address = ?, allocations = ?, threshold = ?, balances = ?,
400+
total_value = ?, last_rebalance = ?, version = version + 1
401+
WHERE id = ?
402+
`).run(
403+
merged.userAddress,
404+
JSON.stringify(merged.allocations),
405+
merged.threshold,
406+
JSON.stringify(merged.balances),
407+
merged.totalValue,
408+
merged.lastRebalance,
409+
id
410+
)
411+
}
412+
357413
return true
358414
} catch (err) {
415+
if (err instanceof ConflictError) throw err
359416
throw new Error(`Failed to update portfolio '${id}': ${err}`)
360417
}
361418
}

backend/src/services/stellar.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Horizon } from '@stellar/stellar-sdk'
22
import type { PricesMap } from '../types/index.js'
3+
import { ConflictError } from '../types/index.js'
34

45
export class StellarService {
56
private server: Horizon.Server
@@ -240,10 +241,12 @@ export class StellarService {
240241
// Calculate new balances
241242
const updatedBalances = await this.calculateRebalancedBalances(portfolio, prices)
242243

243-
await portfolioStorage.updatePortfolio(portfolioId, {
244-
lastRebalance: new Date().toISOString(),
245-
balances: updatedBalances
246-
})
244+
// Compare-and-set: only commit if no concurrent write advanced the version
245+
portfolioStorage.updatePortfolio(
246+
portfolioId,
247+
{ lastRebalance: new Date().toISOString(), balances: updatedBalances },
248+
portfolio.version
249+
)
247250

248251
// Record successful rebalance
249252
const event = await rebalanceHistory.recordRebalanceEvent({
@@ -269,6 +272,10 @@ export class StellarService {
269272
eventId: event.id
270273
}
271274
} catch (error) {
275+
// Bubble up concurrency conflicts without wrapping so callers can
276+
// distinguish a 409 Conflict from a generic 500 failure.
277+
if (error instanceof ConflictError) throw error
278+
272279
const { RebalanceHistoryService } = await import('./rebalanceHistory.js')
273280
const rebalanceHistory = new RebalanceHistoryService()
274281

0 commit comments

Comments
 (0)