Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions run/api/explorers.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const { getSupportedOpNetworks, isOpNetworkSupported } = require('../lib/opNetwo
const authMiddleware = require('../middlewares/auth');
const stripeMiddleware = require('../middlewares/stripe');
const secretMiddleware = require('../middlewares/secret');
const { SYNC_FAILURE_THRESHOLD } = require('../lib/syncHelpers');

/**
* Get orbit config for a given explorer
Expand Down Expand Up @@ -481,6 +482,48 @@ router.post('/syncExplorers', secretMiddleware, async (req, res, next) => {
}
});

/**
* Report a sync failure for an explorer
* Used by PM2 server and sync jobs to report RPC failures
* @route POST /api/explorers/:slug/syncFailure
* @param {string} slug - Explorer slug
* @param {string} reason - Failure reason (e.g., 'rpc_error', 'rpc_unreachable')
* @param {string} source - Source of the failure (e.g., 'pm2', 'blockSync', 'receiptSync')
* @returns {object} - Result with disabled status, attempts count, and message
*/
router.post('/:slug/syncFailure', secretMiddleware, async (req, res, next) => {
try {
const { slug } = req.params;
const { reason, source } = req.body;

const explorer = await Explorer.findOne({ where: { slug } });
if (!explorer) {
return res.status(404).json({ error: 'Explorer not found' });
}

const result = await explorer.incrementSyncFailures(reason || 'rpc_error');

logger.info({
message: 'Sync failure reported',
explorerSlug: slug,
reason: reason || 'rpc_error',
source: source || 'unknown',
attempts: result.attempts,
disabled: result.disabled
});

res.status(200).json({
disabled: result.disabled,
attempts: result.attempts,
message: result.disabled
? `Sync auto-disabled after ${result.attempts} failures`
: `Failure recorded (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD})`
});
} catch(error) {
unmanagedError(error, req, next);
}
});

router.put('/:id/stopSync', [authMiddleware], async (req, res, next) => {
try {
if (!req.params.id)
Expand Down
8 changes: 8 additions & 0 deletions run/jobs/blockSync.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const { enqueue, bulkEnqueue } = require('../lib/queue');
const RateLimiter = require('../lib/rateLimiter');
const constants = require('../constants/orbit');
const { isBatchTransaction, getBatchInfo } = require('../lib/opBatches');
const { reportRpcFailure } = require('../lib/syncHelpers');

module.exports = async job => {
const data = job.data;
Expand Down Expand Up @@ -91,6 +92,13 @@ module.exports = async job => {
block = await providerConnector.fetchRawBlockWithTransactions(data.blockNumber);
} catch(error) {
const priority = job.opts.priority || (data.source == 'cli-light' ? 1 : 10);

// Report RPC failure to explorer (excludes rate limiting and timeouts)
const failureResult = await reportRpcFailure(error, workspace.explorer, 'blockSync', workspace.id);
if (failureResult.shouldStop) {
return failureResult.message;
}

if (error.message == 'Rate limited') {
return enqueue('blockSync', `blockSync-${workspace.id}-${data.blockNumber}-${Date.now()}`, {
userId: workspace.user.firebaseUserId,
Expand Down
8 changes: 8 additions & 0 deletions run/jobs/receiptSync.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { processRawRpcObject } = require('../lib/utils');
const { enqueue } = require('../lib/queue');
const RateLimiter = require('../lib/rateLimiter');
const logger = require('../lib/logger');
const { reportRpcFailure } = require('../lib/syncHelpers');
const {
isTransactionDepositedEvent,
isDisputeGameCreatedEvent,
Expand Down Expand Up @@ -119,6 +120,13 @@ module.exports = async job => {
receipt = await providerConnector.fetchTransactionReceipt(transaction.hash);
} catch(error) {
const priority = job.opts.priority || (data.source == 'cli-light' ? 1 : 10);

// Report RPC failure to explorer (excludes rate limiting and timeouts)
const failureResult = await reportRpcFailure(error, workspace.explorer, 'receiptSync', workspace.id);
if (failureResult.shouldStop) {
return failureResult.message;
}

if (error.message == 'Rate limited') {
return enqueue('receiptSync', `receiptSync-${workspace.id}-${transaction.hash}-${Date.now()}`, {
transactionId: transaction.id,
Expand Down
61 changes: 61 additions & 0 deletions run/lib/syncHelpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* @fileoverview Shared helper functions for sync jobs.
* @module lib/syncHelpers
*/

const logger = require('./logger');

// Re-export the threshold constant for use in API responses
const SYNC_FAILURE_THRESHOLD = 3;

/**
* Report an RPC failure to the explorer and handle auto-disable logic.
* Excludes rate-limited and timeout errors from failure counting since
* they are expected/transient conditions, not actual RPC failures.
*
* @param {Error} error - The error that occurred
* @param {Object} explorer - The explorer model instance
* @param {string} jobName - Name of the job reporting the failure (e.g., 'blockSync', 'receiptSync')
* @param {number} workspaceId - ID of the workspace
* @returns {Promise<{shouldStop: boolean, message: string|null}>} - Whether the job should stop and optional message
*/
async function reportRpcFailure(error, explorer, jobName, workspaceId) {
// Don't count rate limiting or timeouts as failures - they are expected/transient
if (error.message === 'Rate limited' || error.message.startsWith('Timed out after')) {
return { shouldStop: false, message: null };
}

// Only report if explorer exists and sync is enabled
if (!explorer || !explorer.shouldSync) {
return { shouldStop: false, message: null };
}

try {
const result = await explorer.incrementSyncFailures('rpc_error');
if (result.disabled) {
logger.info({
message: `Explorer auto-disabled due to RPC failures in ${jobName}`,
explorerId: explorer.id,
workspaceId: workspaceId,
attempts: result.attempts
});
return {
shouldStop: true,
message: 'Sync disabled due to repeated RPC failures'
};
}
} catch (reportError) {
logger.warn({
message: 'Failed to report sync failure',
error: reportError.message,
workspaceId: workspaceId
});
}

return { shouldStop: false, message: null };
}

module.exports = {
reportRpcFailure,
SYNC_FAILURE_THRESHOLD
};
84 changes: 84 additions & 0 deletions run/tests/api/explorers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,90 @@ describe(`POST ${BASE_URL}/syncExplorers`, () => {
});
});

describe(`POST ${BASE_URL}/:slug/syncFailure`, () => {
it('Should return 401 if no secret provided', (done) => {
request.post(`${BASE_URL}/test-explorer/syncFailure`)
.send({ reason: 'rpc_error', source: 'blockSync' })
.expect(401)
.then(() => done());
});

it('Should return 401 if invalid secret provided', (done) => {
request.post(`${BASE_URL}/test-explorer/syncFailure?secret=invalid`)
.send({ reason: 'rpc_error', source: 'blockSync' })
.expect(401)
.then(() => done());
});

it('Should return 404 if explorer not found', (done) => {
jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce(null);
request.post(`${BASE_URL}/nonexistent/syncFailure?secret=secret`)
.send({ reason: 'rpc_error', source: 'blockSync' })
.expect(404)
.then(({ body }) => {
expect(body.error).toEqual('Explorer not found');
done();
});
});

it('Should increment failure counter and return result', (done) => {
const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 });
jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({
id: 1,
slug: 'test-explorer',
incrementSyncFailures: mockIncrementSyncFailures
});

request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`)
.send({ reason: 'rpc_error', source: 'blockSync' })
.expect(200)
.then(({ body }) => {
expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_error');
expect(body.disabled).toBe(false);
expect(body.attempts).toBe(1);
expect(body.message).toContain('Failure recorded');
done();
});
});

it('Should auto-disable after threshold and return disabled status', (done) => {
const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: true, attempts: 3 });
jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({
id: 1,
slug: 'test-explorer',
incrementSyncFailures: mockIncrementSyncFailures
});

request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`)
.send({ reason: 'rpc_unreachable', source: 'pm2' })
.expect(200)
.then(({ body }) => {
expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_unreachable');
expect(body.disabled).toBe(true);
expect(body.attempts).toBe(3);
expect(body.message).toContain('auto-disabled');
done();
});
});

it('Should use default reason if not provided', (done) => {
const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 });
jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({
id: 1,
slug: 'test-explorer',
incrementSyncFailures: mockIncrementSyncFailures
});

request.post(`${BASE_URL}/test-explorer/syncFailure?secret=secret`)
.send({})
.expect(200)
.then(() => {
expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_error');
done();
});
});
});

describe(`PUT ${BASE_URL}/:id/stopSync`, () => {
it('Should return an error if cannot find explorer', (done) => {
jest.spyOn(db, 'getExplorerById').mockResolvedValueOnce(null);
Expand Down
112 changes: 112 additions & 0 deletions run/tests/lib/syncHelpers.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
require('../mocks/lib/queue');
require('../mocks/lib/firebase');

const { reportRpcFailure, SYNC_FAILURE_THRESHOLD } = require('../../lib/syncHelpers');

describe('syncHelpers', () => {
describe('SYNC_FAILURE_THRESHOLD', () => {
it('Should export the sync failure threshold constant', () => {
expect(SYNC_FAILURE_THRESHOLD).toBe(3);
});
});

describe('reportRpcFailure', () => {
it('Should not count rate-limited errors as failures', async () => {
const error = new Error('Rate limited');
const explorer = {
id: 1,
shouldSync: true,
incrementSyncFailures: jest.fn()
};

const result = await reportRpcFailure(error, explorer, 'blockSync', 123);

expect(result.shouldStop).toBe(false);
expect(result.message).toBeNull();
expect(explorer.incrementSyncFailures).not.toHaveBeenCalled();
});

it('Should not count timeout errors as failures', async () => {
const error = new Error('Timed out after 30000ms');
const explorer = {
id: 1,
shouldSync: true,
incrementSyncFailures: jest.fn()
};

const result = await reportRpcFailure(error, explorer, 'blockSync', 123);

expect(result.shouldStop).toBe(false);
expect(result.message).toBeNull();
expect(explorer.incrementSyncFailures).not.toHaveBeenCalled();
});

it('Should count other RPC errors as failures', async () => {
const error = new Error('Connection refused');
const explorer = {
id: 1,
shouldSync: true,
incrementSyncFailures: jest.fn().mockResolvedValue({ disabled: false, attempts: 1 })
};

const result = await reportRpcFailure(error, explorer, 'blockSync', 123);

expect(result.shouldStop).toBe(false);
expect(result.message).toBeNull();
expect(explorer.incrementSyncFailures).toHaveBeenCalledWith('rpc_error');
});

it('Should return shouldStop=true when explorer is auto-disabled', async () => {
const error = new Error('Connection refused');
const explorer = {
id: 1,
shouldSync: true,
incrementSyncFailures: jest.fn().mockResolvedValue({ disabled: true, attempts: 3 })
};

const result = await reportRpcFailure(error, explorer, 'blockSync', 123);

expect(result.shouldStop).toBe(true);
expect(result.message).toBe('Sync disabled due to repeated RPC failures');
});

it('Should not report if explorer is null', async () => {
const error = new Error('Connection refused');

const result = await reportRpcFailure(error, null, 'blockSync', 123);

expect(result.shouldStop).toBe(false);
expect(result.message).toBeNull();
});

it('Should not report if explorer.shouldSync is false', async () => {
const error = new Error('Connection refused');
const explorer = {
id: 1,
shouldSync: false,
incrementSyncFailures: jest.fn()
};

const result = await reportRpcFailure(error, explorer, 'blockSync', 123);

expect(result.shouldStop).toBe(false);
expect(result.message).toBeNull();
expect(explorer.incrementSyncFailures).not.toHaveBeenCalled();
});

it('Should handle incrementSyncFailures errors gracefully', async () => {
const error = new Error('Connection refused');
const explorer = {
id: 1,
shouldSync: true,
incrementSyncFailures: jest.fn().mockRejectedValue(new Error('DB error'))
};

// Should not throw
const result = await reportRpcFailure(error, explorer, 'blockSync', 123);

expect(result.shouldStop).toBe(false);
expect(result.message).toBeNull();
});
});
});