11import pLimit from "p-limit" ;
2- import { rootPgPool , sequelize } from "../../config/database" ;
2+ import { closeDatabase , rootPgPool , sequelize } from "../../config/database" ;
33import { getGuardsFromBalances } from "./payload" ;
44import Guard from "../../models/guard" ;
55
66const CONCURRENCY_LIMIT = 4 ; // Number of concurrent fetches allowed
77const limitFetch = pLimit ( CONCURRENCY_LIMIT ) ;
88
99export async function startGuardsBackfill ( ) {
10- try {
11- await sequelize . authenticate ( ) ;
12- console . log ( "Connected to the database." ) ;
10+ await sequelize . authenticate ( ) ;
11+ console . log ( "Connected to the database." ) ;
1312
14- await rootPgPool . query (
15- `
13+ await rootPgPool . query (
14+ `
1615 BEGIN;
1716 SET enable_seqscan = OFF;
1817 WITH combined AS (
@@ -28,72 +27,65 @@ export async function startGuardsBackfill() {
2827 GROUP BY "chainId", "account", "module"
2928 ON CONFLICT ("chainId", "account", "module", "tokenId") DO NOTHING;
3029 DELETE FROM "Guards";
31- SELECT SETVAL(' "Guards_id_seq"', 1) ;
30+ ALTER SEQUENCE "Guards_id_seq" RESTART WITH 1 ;
3231 COMMIT;
3332 ` ,
34- ) ;
35-
36- console . log ( "Balances backfilled successfully." ) ;
37- console . log ( "Starting guards backfill ..." ) ;
33+ ) ;
3834
39- const limit = 3000 ; // Number of rows to process in one batch
40- let offset = 0 ;
35+ console . log ( "Balances backfilled successfully." ) ;
36+ console . log ( "Starting guards backfill ..." ) ;
4137
42- while ( true ) {
43- const tx = await sequelize . transaction ( ) ;
44- try {
45- console . log ( `Fetching rows from offset: ${ offset } , limit: ${ limit } ` ) ;
46- const res = await rootPgPool . query (
47- `SELECT b.id, b.account, b."chainId", b.module FROM "Balances" b ORDER BY b.id LIMIT $1 OFFSET $2` ,
48- [ limit , offset ] ,
49- ) ;
38+ const limit = 10000 ; // Number of rows to process in one batch
39+ let offset = 0 ;
5040
51- const rows = res . rows ;
52- if ( rows . length === 0 ) {
53- console . log ( "No more rows to process." ) ;
54- break ;
55- }
41+ while ( true ) {
42+ console . log ( `Fetching rows from offset: ${ offset } , limit: ${ limit } ` ) ;
43+ const res = await rootPgPool . query (
44+ `SELECT b.id, b.account, b."chainId", b.module FROM "Balances" b ORDER BY b.id LIMIT $1 OFFSET $2` ,
45+ [ limit , offset ] ,
46+ ) ;
5647
57- // Use p-limit to ensure controlled concurrency for fetch requests
58- const fetchPromises = rows . map ( ( row ) =>
59- limitFetch ( ( ) =>
60- getGuardsFromBalances ( [
61- {
62- id : row . id ,
63- account : row . account ,
64- chainId : row . chainId ,
65- module : row . module ,
66- } ,
67- ] ) ,
68- ) ,
69- ) ;
70- const guards = ( await Promise . all ( fetchPromises ) ) . flat ( ) ;
48+ const rows = res . rows ;
49+ if ( rows . length === 0 ) {
50+ console . log ( "No more rows to process." ) ;
51+ break ;
52+ }
7153
72- await Guard . bulkCreate ( guards , {
73- transaction : tx ,
74- } ) ;
54+ // Use p-limit to ensure controlled concurrency for fetch requests
55+ const fetchPromises = rows . map ( ( row ) =>
56+ limitFetch ( ( ) =>
57+ getGuardsFromBalances ( [
58+ {
59+ id : row . id ,
60+ account : row . account ,
61+ chainId : row . chainId ,
62+ module : row . module ,
63+ } ,
64+ ] ) ,
65+ ) ,
66+ ) ;
67+ const guards = ( await Promise . all ( fetchPromises ) ) . flat ( ) ;
68+ const tx = await sequelize . transaction ( ) ;
69+ try {
70+ await Guard . bulkCreate ( guards , {
71+ transaction : tx ,
72+ } ) ;
7573
76- await tx . commit ( ) ;
77- console . log ( `Batch at offset ${ offset } processed successfully.` ) ;
78- offset += limit ;
79- } catch ( batchError ) {
80- console . error (
81- `Error processing batch at offset ${ offset } :` ,
82- batchError ,
83- ) ;
84- try {
85- await tx . rollback ( ) ;
86- console . log ( `Transaction for batch at offset ${ offset } rolled back.` ) ;
87- } catch ( rollbackError ) {
88- console . error ( "Error during rollback:" , rollbackError ) ;
89- }
90- break ;
74+ await tx . commit ( ) ;
75+ console . log ( `Batch at offset ${ offset } processed successfully.` ) ;
76+ offset += limit ;
77+ } catch ( batchError ) {
78+ console . error ( `Error processing batch at offset ${ offset } :` , batchError ) ;
79+ try {
80+ await tx . rollback ( ) ;
81+ console . log ( `Transaction for batch at offset ${ offset } rolled back.` ) ;
82+ } catch ( rollbackError ) {
83+ console . error ( "Error during rollback:" , rollbackError ) ;
9184 }
85+ break ;
9286 }
93- } catch ( error ) {
94- console . error ( "Error during backfill:" , error ) ;
95- } finally {
96- await sequelize . close ( ) ;
97- console . log ( "Database connection closed." ) ;
9887 }
88+
89+ await closeDatabase ( ) ;
90+ process . exit ( 0 ) ;
9991}
0 commit comments