Skip to content

Commit b5fe956

Browse files
authored
Feat/speed up integrity checker (#948)
* overhauled integrity-checker * do just in time fetching of the accountInfos
1 parent 95388c3 commit b5fe956

File tree

1 file changed

+62
-56
lines changed

1 file changed

+62
-56
lines changed

packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts

+62-56
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,13 @@ export const integrityCheckProgramAccounts = async ({
6464
}
6565

6666
const performIntegrityCheck = async () => {
67-
const startTime = new Date();
67+
const snapshotTime = new Date();
6868
const t = await sequelize.transaction({
6969
isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED,
7070
});
7171

7272
try {
73+
let limiter: pLimit.Limit;
7374
const program = new anchor.Program(idl, provider);
7475
const currentSlot = await connection.getSlot();
7576
let blockTime24HoursAgo: number | null = null;
@@ -117,7 +118,7 @@ export const integrityCheckProgramAccounts = async ({
117118
throw new Error("Unable to get blocktime from 24 hours ago");
118119
}
119120

120-
const limiter = pLimit(25);
121+
limiter = pLimit(10);
121122
const parsedTransactions = (
122123
await Promise.all(
123124
chunks(
@@ -126,7 +127,7 @@ export const integrityCheckProgramAccounts = async ({
126127
blockTime: blockTime24HoursAgo,
127128
provider,
128129
}),
129-
75
130+
100
130131
).map((chunk) =>
131132
limiter(async () => {
132133
await new Promise((resolve) => setTimeout(resolve, 250));
@@ -144,39 +145,20 @@ export const integrityCheckProgramAccounts = async ({
144145
).flat();
145146

146147
const uniqueWritableAccounts = new Set<string>();
147-
for (const parsed of parsedTransactions) {
148-
parsed?.transaction.message.accountKeys
149-
.filter((acc) => acc.writable)
150-
.map((acc) => {
151-
uniqueWritableAccounts.add(acc.pubkey.toBase58());
152-
txIdsByAccountId[acc.pubkey.toBase58()] = [
153-
...parsed.transaction.signatures,
154-
...(txIdsByAccountId[acc.pubkey.toBase58()] || []),
148+
parsedTransactions.forEach((parsed) => {
149+
if (!parsed) return;
150+
const signatures = parsed.transaction.signatures;
151+
parsed.transaction.message.accountKeys.forEach((acc) => {
152+
if (acc.writable) {
153+
const pubkey = acc.pubkey.toBase58();
154+
uniqueWritableAccounts.add(pubkey);
155+
txIdsByAccountId[pubkey] = [
156+
...signatures,
157+
...(txIdsByAccountId[pubkey] || []),
155158
];
156-
});
157-
}
158-
159-
const accountInfosWithPk = (
160-
await Promise.all(
161-
chunks([...uniqueWritableAccounts.values()], 100).map((chunk) =>
162-
pLimit(100)(() =>
163-
retry(
164-
() =>
165-
connection.getMultipleAccountsInfo(
166-
chunk.map((c) => new PublicKey(c)),
167-
"confirmed"
168-
),
169-
retryOptions
170-
)
171-
)
172-
)
173-
)
174-
)
175-
.flat()
176-
.map((accountInfo, idx) => ({
177-
pubkey: [...uniqueWritableAccounts.values()][idx],
178-
...accountInfo,
179-
}));
159+
}
160+
});
161+
});
180162

181163
const pluginsByAccountType = (
182164
await Promise.all(
@@ -199,10 +181,28 @@ export const integrityCheckProgramAccounts = async ({
199181
])
200182
);
201183

184+
limiter = pLimit(100);
185+
const uniqueWritableAccountsArray = [...uniqueWritableAccounts.values()];
202186
await Promise.all(
203-
chunks(accountInfosWithPk, 1000).map(async (chunk) => {
204-
const accountsByType: Record<string, typeof accountInfosWithPk> = {};
205-
chunk.forEach((accountInfo) => {
187+
chunks(uniqueWritableAccountsArray, 100).map(async (chunk) => {
188+
const accountInfos = await limiter(() =>
189+
retry(
190+
() =>
191+
connection.getMultipleAccountsInfo(
192+
chunk.map((c) => new PublicKey(c)),
193+
"confirmed"
194+
),
195+
retryOptions
196+
)
197+
);
198+
199+
const accountInfosWithPk = accountInfos.map((accountInfo, idx) => ({
200+
pubkey: chunk[idx],
201+
...accountInfo,
202+
}));
203+
204+
const accsByType: Record<string, typeof accountInfosWithPk> = {};
205+
accountInfosWithPk.forEach((accountInfo) => {
206206
const accName = accounts.find(
207207
({ type }) =>
208208
accountInfo.data &&
@@ -212,24 +212,34 @@ export const integrityCheckProgramAccounts = async ({
212212
)?.type;
213213

214214
if (accName) {
215-
accountsByType[accName] = accountsByType[accName] || [];
216-
accountsByType[accName].push(accountInfo);
215+
accsByType[accName] = accsByType[accName] || [];
216+
accsByType[accName].push(accountInfo);
217217
}
218218
});
219219

220220
await Promise.all(
221-
Object.entries(accountsByType).map(async ([accName, accounts]) => {
221+
Object.entries(accsByType).map(async ([accName, accounts]) => {
222222
const model = sequelize.models[accName];
223+
const pubkeys = accounts.map((c) => c.pubkey);
224+
const existingAccs = await model.findAll({
225+
where: { address: pubkeys },
226+
transaction: t,
227+
});
228+
229+
const existingAccMap = new Map(
230+
existingAccs.map((acc) => [acc.get("address"), acc])
231+
);
232+
223233
await Promise.all(
224-
accounts.map(async (c) => {
234+
accounts.map(async (acc) => {
225235
const decodedAcc = program.coder.accounts.decode(
226236
lowerFirstChar(accName),
227-
c.data as Buffer
237+
acc.data as Buffer
228238
);
229239

230240
let sanitized = {
231241
refreshed_at: new Date().toISOString(),
232-
address: c.pubkey,
242+
address: acc.pubkey,
233243
...sanitizeAccount(decodedAcc),
234244
};
235245

@@ -239,35 +249,31 @@ export const integrityCheckProgramAccounts = async ({
239249
sanitized = await plugin.processAccount(sanitized, t);
240250
} catch (err) {
241251
console.log(
242-
`Plugin processing failed for account ${c.pubkey}`,
252+
`Plugin processing failed for account ${acc.pubkey}`,
243253
err
244254
);
245-
// Continue with unmodified sanitized data instead of failing
246255
continue;
247256
}
248257
}
249258
}
250259

251-
const existing = await model.findByPk(c.pubkey, {
252-
transaction: t,
253-
});
260+
const existing = existingAccMap.get(acc.pubkey);
261+
const refreshedAt = existing?.dataValues.refreshed_at
262+
? new Date(existing.dataValues.refreshed_at)
263+
: null;
254264

255265
const shouldUpdate =
256266
!deepEqual(
257267
_omit(sanitized, OMIT_KEYS),
258268
_omit(existing?.dataValues, OMIT_KEYS)
259269
) &&
260-
!(
261-
existing?.dataValues.refreshed_at &&
262-
new Date(existing.dataValues.refreshed_at) >= startTime &&
263-
new Date(existing.dataValues.refreshed_at) <= new Date()
264-
);
270+
(!refreshedAt || refreshedAt < snapshotTime);
265271

266272
if (shouldUpdate) {
267273
corrections.push({
268274
type: accName,
269-
accountId: c.pubkey,
270-
txSignatures: txIdsByAccountId[c.pubkey],
275+
accountId: acc.pubkey,
276+
txSignatures: txIdsByAccountId[acc.pubkey],
271277
currentValues: existing ? existing.dataValues : null,
272278
newValues: sanitized,
273279
});

0 commit comments

Comments
 (0)