|
1 | 1 | const { ProviderConnector } = require('../lib/rpc'); |
2 | | -const { Workspace, Explorer, StripeSubscription, RpcHealthCheck, IntegrityCheck, Block, OrbitChainConfig, OpChainConfig, OpBatch } = require('../models'); |
| 2 | +const { Workspace, Explorer, StripeSubscription, RpcHealthCheck, IntegrityCheck, Block, OrbitChainConfig, OpChainConfig, OpBatch, sequelize } = require('../models'); |
3 | 3 | const db = require('../lib/firebase'); |
4 | 4 | const logger = require('../lib/logger'); |
5 | 5 | const { processRawRpcObject } = require('../lib/utils'); |
@@ -194,34 +194,38 @@ module.exports = async job => { |
194 | 194 | }); |
195 | 195 |
|
196 | 196 | if (batchInfo) { |
197 | | - // Get the next batch index |
198 | | - const lastBatch = await OpBatch.findOne({ |
199 | | - where: { workspaceId: opConfig.workspaceId }, |
200 | | - order: [['batchIndex', 'DESC']] |
201 | | - }); |
202 | | - const nextBatchIndex = lastBatch ? lastBatch.batchIndex + 1 : 0; |
203 | | - |
204 | 197 | // Find the L1 transaction record if it exists |
205 | 198 | const l1Transaction = syncedBlock.transactions.find(t => t.hash === tx.hash); |
206 | 199 |
|
207 | | - await OpBatch.create({ |
208 | | - workspaceId: opConfig.workspaceId, |
209 | | - batchIndex: nextBatchIndex, |
210 | | - l1BlockNumber: batchInfo.l1BlockNumber, |
211 | | - l1TransactionHash: batchInfo.l1TransactionHash, |
212 | | - l1TransactionId: l1Transaction ? l1Transaction.id : null, |
213 | | - l1TransactionIndex: batchInfo.l1TransactionIndex, |
214 | | - epochNumber: batchInfo.l1BlockNumber, // Epoch is typically the L1 block |
215 | | - timestamp: tx.timestamp ? new Date(tx.timestamp * 1000) : new Date(), |
216 | | - txCount: batchInfo.estimatedBlockCount || null, |
217 | | - l2BlockStart: batchInfo.l2BlockStart, |
218 | | - l2BlockEnd: batchInfo.l2BlockEnd, |
219 | | - blobHash: batchInfo.blobHash, |
220 | | - blobData: batchInfo.blobData, |
221 | | - status: 'pending' |
| 200 | + // Use transaction with lock to prevent race condition on batch index |
| 201 | + await sequelize.transaction(async (t) => { |
| 202 | + const lastBatch = await OpBatch.findOne({ |
| 203 | + where: { workspaceId: opConfig.workspaceId }, |
| 204 | + order: [['batchIndex', 'DESC']], |
| 205 | + lock: t.LOCK.UPDATE, |
| 206 | + transaction: t |
| 207 | + }); |
| 208 | + const nextBatchIndex = lastBatch ? lastBatch.batchIndex + 1 : 0; |
| 209 | + |
| 210 | + await OpBatch.create({ |
| 211 | + workspaceId: opConfig.workspaceId, |
| 212 | + batchIndex: nextBatchIndex, |
| 213 | + l1BlockNumber: batchInfo.l1BlockNumber, |
| 214 | + l1TransactionHash: batchInfo.l1TransactionHash, |
| 215 | + l1TransactionId: l1Transaction ? l1Transaction.id : null, |
| 216 | + l1TransactionIndex: batchInfo.l1TransactionIndex, |
| 217 | + epochNumber: batchInfo.l1BlockNumber, // Epoch is typically the L1 block |
| 218 | + timestamp: tx.timestamp ? new Date(tx.timestamp * 1000) : new Date(), |
| 219 | + txCount: batchInfo.estimatedBlockCount || null, |
| 220 | + l2BlockStart: batchInfo.l2BlockStart, |
| 221 | + l2BlockEnd: batchInfo.l2BlockEnd, |
| 222 | + blobHash: batchInfo.blobHash, |
| 223 | + blobData: batchInfo.blobData, |
| 224 | + status: 'pending' |
| 225 | + }, { transaction: t }); |
| 226 | + |
| 227 | + logger.info(`Created OP batch ${nextBatchIndex} for L2 workspace ${opConfig.workspaceId} from L1 tx ${tx.hash}`); |
222 | 228 | }); |
223 | | - |
224 | | - logger.info(`Created OP batch ${nextBatchIndex} for L2 workspace ${opConfig.workspaceId} from L1 tx ${tx.hash}`); |
225 | 229 | } |
226 | 230 | } catch (error) { |
227 | 231 | logger.error(`Error processing OP batch for tx ${tx.hash}: ${error.message}`, { location: 'jobs.blockSync.opBatch', error }); |
|
0 commit comments