Skip to content

Commit a9dae10

Browse files
Antoine de Chevignéclaude
andcommitted
Address code review feedback for sync auto-disable
- Fix race condition: use atomic increment for syncFailedAttempts - Add null check for explorer.workspace in updateExplorerSyncingProcess - Simplify backoff calculation using recoveryAttempts counter - Add max recovery attempts (10) after which manual intervention required - Add batching (50 explorers) to syncRecoveryCheck job - Add random jitter (up to 2 min) to stagger recovery checks - Remove dead code (resetSyncState method) - Add recoveryAttempts column to migration Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 1b99d3d commit a9dae10

File tree

6 files changed

+171
-58
lines changed

6 files changed

+171
-58
lines changed

run/jobs/syncRecoveryCheck.js

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Re-enables sync when RPC becomes reachable, using exponential backoff for retries.
55
*
66
* Backoff schedule: 5m -> 15m -> 1h -> 6h (max)
7+
* Max recovery attempts: 10 (after which manual intervention is required)
78
*
89
* @module jobs/syncRecoveryCheck
910
*/
@@ -14,8 +15,12 @@ const { withTimeout } = require('../lib/utils');
1415
const { Op } = require('sequelize');
1516
const logger = require('../lib/logger');
1617

18+
// Process explorers in batches to avoid long-running jobs
19+
const BATCH_SIZE = 50;
20+
1721
module.exports = async () => {
18-
// Find explorers that are due for a recovery check
22+
// Find explorers that are due for a recovery check (with batch limit)
23+
// Excludes explorers that have reached max recovery attempts (nextRecoveryCheckAt is null)
1924
const explorers = await Explorer.findAll({
2025
where: {
2126
syncDisabledReason: { [Op.ne]: null },
@@ -25,7 +30,9 @@ module.exports = async () => {
2530
model: Workspace,
2631
as: 'workspace',
2732
attributes: ['id', 'rpcServer']
28-
}]
33+
}],
34+
limit: BATCH_SIZE,
35+
order: [['nextRecoveryCheckAt', 'ASC']] // Process oldest first
2936
});
3037

3138
if (explorers.length === 0) {
@@ -34,9 +41,20 @@ module.exports = async () => {
3441

3542
let recovered = 0;
3643
let stillUnreachable = 0;
44+
let maxAttemptsReached = 0;
3745

3846
for (const explorer of explorers) {
3947
try {
48+
// Skip if workspace is missing
49+
if (!explorer.workspace || !explorer.workspace.rpcServer) {
50+
logger.warn({
51+
message: 'Explorer recovery check skipped: no workspace or RPC server',
52+
explorerId: explorer.id,
53+
explorerSlug: explorer.slug
54+
});
55+
continue;
56+
}
57+
4058
const provider = new ProviderConnector(explorer.workspace.rpcServer);
4159
const block = await withTimeout(provider.fetchLatestBlock());
4260

@@ -49,17 +67,38 @@ module.exports = async () => {
4967
message: 'Explorer re-enabled after RPC recovery',
5068
explorerId: explorer.id,
5169
explorerSlug: explorer.slug,
52-
disabledReason: explorer.syncDisabledReason
70+
disabledReason: explorer.syncDisabledReason,
71+
recoveryAttempts: explorer.recoveryAttempts
5372
});
5473
} else {
5574
// Block fetch returned null - still unreachable
56-
await explorer.scheduleNextRecoveryCheck();
57-
stillUnreachable++;
75+
const result = await explorer.scheduleNextRecoveryCheck();
76+
if (result.maxReached) {
77+
maxAttemptsReached++;
78+
logger.warn({
79+
message: 'Explorer reached max recovery attempts - manual intervention required',
80+
explorerId: explorer.id,
81+
explorerSlug: explorer.slug,
82+
attempts: result.attempts
83+
});
84+
} else {
85+
stillUnreachable++;
86+
}
5887
}
5988
} catch (error) {
6089
// RPC check failed - schedule next check with backoff
61-
await explorer.scheduleNextRecoveryCheck();
62-
stillUnreachable++;
90+
const result = await explorer.scheduleNextRecoveryCheck();
91+
if (result.maxReached) {
92+
maxAttemptsReached++;
93+
logger.warn({
94+
message: 'Explorer reached max recovery attempts - manual intervention required',
95+
explorerId: explorer.id,
96+
explorerSlug: explorer.slug,
97+
attempts: result.attempts
98+
});
99+
} else {
100+
stillUnreachable++;
101+
}
63102

64103
logger.debug({
65104
message: 'Explorer recovery check failed',
@@ -70,5 +109,5 @@ module.exports = async () => {
70109
}
71110
}
72111

73-
return `Checked ${explorers.length} explorers: ${recovered} recovered, ${stillUnreachable} still unreachable`;
112+
return `Checked ${explorers.length} explorers: ${recovered} recovered, ${stillUnreachable} still unreachable, ${maxAttemptsReached} max attempts reached`;
74113
};

run/jobs/updateExplorerSyncingProcess.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const { Explorer, Workspace, RpcHealthCheck, StripeSubscription, StripePlan } =
99
const PM2 = require('../lib/pm2');
1010
const logger = require('../lib/logger');
1111

12+
// Must match SYNC_FAILURE_THRESHOLD in explorer.js
1213
const SYNC_FAILURE_THRESHOLD = 3;
1314

1415
module.exports = async job => {
@@ -23,17 +24,21 @@ module.exports = async job => {
2324
{
2425
model: Workspace,
2526
as: 'workspace',
27+
required: false,
2628
include: {
2729
model: RpcHealthCheck,
28-
as: 'rpcHealthCheck'
30+
as: 'rpcHealthCheck',
31+
required: false
2932
}
3033
},
3134
{
3235
model: StripeSubscription,
3336
as: 'stripeSubscription',
37+
required: false,
3438
include: {
3539
model: StripePlan,
36-
as: 'stripePlan'
40+
as: 'stripePlan',
41+
required: false
3742
}
3843
}
3944
]
@@ -59,7 +64,11 @@ module.exports = async job => {
5964
await pm2.delete(explorer.slug);
6065
return 'Process deleted: no subscription.';
6166
}
62-
else if (explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) {
67+
else if (explorer && !explorer.workspace) {
68+
await pm2.delete(explorer.slug);
69+
return 'Process deleted: no workspace.';
70+
}
71+
else if (explorer.workspace && explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) {
6372
await pm2.delete(explorer.slug);
6473
// Track RPC failure and potentially auto-disable
6574
const result = await explorer.incrementSyncFailures('rpc_unreachable');

run/migrations/20260109161142-add-sync-failure-tracking.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ module.exports = {
2020
allowNull: true
2121
}, { transaction });
2222

23+
await queryInterface.addColumn('explorers', 'recoveryAttempts', {
24+
type: Sequelize.DataTypes.INTEGER,
25+
allowNull: false,
26+
defaultValue: 0
27+
}, { transaction });
28+
2329
await queryInterface.addColumn('explorers', 'nextRecoveryCheckAt', {
2430
type: Sequelize.DataTypes.DATE,
2531
allowNull: true
@@ -47,6 +53,7 @@ module.exports = {
4753
await queryInterface.removeColumn('explorers', 'syncFailedAttempts', { transaction });
4854
await queryInterface.removeColumn('explorers', 'syncDisabledAt', { transaction });
4955
await queryInterface.removeColumn('explorers', 'syncDisabledReason', { transaction });
56+
await queryInterface.removeColumn('explorers', 'recoveryAttempts', { transaction });
5057
await queryInterface.removeColumn('explorers', 'nextRecoveryCheckAt', { transaction });
5158

5259
await transaction.commit();

run/models/explorer.js

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@ const MAX_RPC_ATTEMPTS = 3;
3333

3434
// Sync failure auto-disable configuration
3535
const SYNC_FAILURE_THRESHOLD = 3;
36+
const MAX_RECOVERY_ATTEMPTS = 10;
3637
const RECOVERY_BACKOFF_SCHEDULE = [
3738
5 * 60 * 1000, // 5 minutes
3839
15 * 60 * 1000, // 15 minutes
3940
60 * 60 * 1000, // 1 hour
4041
6 * 60 * 60 * 1000 // 6 hours (max)
4142
];
43+
// Stagger recovery checks to avoid thundering herd (random jitter up to 2 minutes)
44+
const RECOVERY_JITTER_MAX = 2 * 60 * 1000;
4245

4346
module.exports = (sequelize, DataTypes) => {
4447
class Explorer extends Model {
@@ -317,6 +320,7 @@ module.exports = (sequelize, DataTypes) => {
317320
syncFailedAttempts: 0,
318321
syncDisabledAt: null,
319322
syncDisabledReason: null,
323+
recoveryAttempts: 0,
320324
nextRecoveryCheckAt: null
321325
});
322326
return this;
@@ -332,82 +336,76 @@ module.exports = (sequelize, DataTypes) => {
332336

333337
/**
334338
* Increments the sync failure counter and auto-disables if threshold reached.
339+
* Uses atomic increment to avoid race conditions.
335340
* @param {string} [reason='rpc_unreachable'] - Reason for the failure
336341
* @returns {Promise<{disabled: boolean, attempts: number}>} Result with disable status
337342
*/
338343
async incrementSyncFailures(reason = 'rpc_unreachable') {
339-
const newCount = (this.syncFailedAttempts || 0) + 1;
340-
await this.update({ syncFailedAttempts: newCount });
344+
// Use atomic increment to avoid race conditions
345+
await this.increment('syncFailedAttempts');
346+
await this.reload();
341347

342-
if (newCount >= SYNC_FAILURE_THRESHOLD) {
348+
if (this.syncFailedAttempts >= SYNC_FAILURE_THRESHOLD) {
343349
await this.autoDisableSync(reason);
344-
return { disabled: true, attempts: newCount };
350+
return { disabled: true, attempts: this.syncFailedAttempts };
345351
}
346-
return { disabled: false, attempts: newCount };
352+
return { disabled: false, attempts: this.syncFailedAttempts };
347353
}
348354

349355
/**
350356
* Auto-disables sync and schedules first recovery check.
357+
* Adds random jitter to avoid thundering herd when many explorers are disabled at once.
351358
* @param {string} reason - Reason for disabling (e.g., 'rpc_unreachable')
352359
* @returns {Promise<Explorer>} Updated explorer
353360
*/
354361
async autoDisableSync(reason) {
355-
const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[0]);
362+
// Add random jitter to stagger recovery checks
363+
const jitter = Math.floor(Math.random() * RECOVERY_JITTER_MAX);
364+
const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[0] + jitter);
356365
await this.update({
357366
shouldSync: false,
358367
syncDisabledAt: new Date(),
359368
syncDisabledReason: reason,
369+
recoveryAttempts: 0,
360370
nextRecoveryCheckAt: nextCheck
361371
});
362372
return this;
363373
}
364374

365-
/**
366-
* Resets all sync failure tracking state.
367-
* @returns {Promise<Explorer>} Updated explorer
368-
*/
369-
async resetSyncState() {
370-
await this.update({
371-
syncFailedAttempts: 0,
372-
syncDisabledAt: null,
373-
syncDisabledReason: null,
374-
nextRecoveryCheckAt: null
375-
});
376-
return this;
377-
}
378-
379375
/**
380376
* Schedules the next recovery check using exponential backoff.
377+
* Increments recovery attempts and returns null if max attempts reached.
381378
* Backoff schedule: 5m -> 15m -> 1h -> 6h (max)
382-
* @returns {Promise<Explorer>} Updated explorer
379+
* @returns {Promise<{scheduled: boolean, attempts: number, maxReached: boolean}>} Result
383380
*/
384381
async scheduleNextRecoveryCheck() {
385382
if (!this.syncDisabledAt) {
386-
return this;
383+
return { scheduled: false, attempts: 0, maxReached: false };
387384
}
388385

389-
const timeSinceDisabled = Date.now() - new Date(this.syncDisabledAt).getTime();
390-
let cumulativeTime = 0;
391-
let backoffIndex = 0;
386+
const newAttempts = (this.recoveryAttempts || 0) + 1;
392387

393-
// Find which backoff interval we should use based on time since disabled
394-
for (let i = 0; i < RECOVERY_BACKOFF_SCHEDULE.length; i++) {
395-
cumulativeTime += RECOVERY_BACKOFF_SCHEDULE[i];
396-
if (timeSinceDisabled < cumulativeTime) {
397-
backoffIndex = i;
398-
break;
399-
}
400-
backoffIndex = i;
388+
// Check if max recovery attempts reached
389+
if (newAttempts >= MAX_RECOVERY_ATTEMPTS) {
390+
await this.update({
391+
recoveryAttempts: newAttempts,
392+
nextRecoveryCheckAt: null,
393+
syncDisabledReason: 'max_recovery_attempts_reached'
394+
});
395+
return { scheduled: false, attempts: newAttempts, maxReached: true };
401396
}
402397

403-
// Cap at max backoff (last element)
404-
if (backoffIndex >= RECOVERY_BACKOFF_SCHEDULE.length) {
405-
backoffIndex = RECOVERY_BACKOFF_SCHEDULE.length - 1;
406-
}
398+
// Use recovery attempts as index, capped at max backoff
399+
const backoffIndex = Math.min(newAttempts - 1, RECOVERY_BACKOFF_SCHEDULE.length - 1);
400+
// Add random jitter to stagger recovery checks
401+
const jitter = Math.floor(Math.random() * RECOVERY_JITTER_MAX);
402+
const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[backoffIndex] + jitter);
407403

408-
const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[backoffIndex]);
409-
await this.update({ nextRecoveryCheckAt: nextCheck });
410-
return this;
404+
await this.update({
405+
recoveryAttempts: newAttempts,
406+
nextRecoveryCheckAt: nextCheck
407+
});
408+
return { scheduled: true, attempts: newAttempts, maxReached: false };
411409
}
412410

413411
/**
@@ -420,6 +418,7 @@ module.exports = (sequelize, DataTypes) => {
420418
syncFailedAttempts: 0,
421419
syncDisabledAt: null,
422420
syncDisabledReason: null,
421+
recoveryAttempts: 0,
423422
nextRecoveryCheckAt: null
424423
});
425424
return this;
@@ -806,6 +805,7 @@ module.exports = (sequelize, DataTypes) => {
806805
syncFailedAttempts: DataTypes.INTEGER,
807806
syncDisabledAt: DataTypes.DATE,
808807
syncDisabledReason: DataTypes.STRING,
808+
recoveryAttempts: DataTypes.INTEGER,
809809
nextRecoveryCheckAt: DataTypes.DATE
810810
}, {
811811
hooks: {

0 commit comments

Comments
 (0)