Skip to content

Commit 124b6d2

Browse files
committed
refat: improved streaming
1 parent e3e4d25 commit 124b6d2

File tree

19 files changed

+470
-1876
lines changed

19 files changed

+470
-1876
lines changed

indexer/.env.template

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,14 @@
1-
AWS_S3_REGION="us-east-1"
2-
AWS_S3_BUCKET_NAME="kadena-indexer-data-002"
3-
AWS_ACCESS_KEY_ID="YOUR_ACCESS KEY_ID"
4-
AWS_SECRET_ACCESS_KEY="YOUR_SECRET_ACCESS_KEY"
5-
# only use if you wanna run s3 in your local machine through localstack
6-
AWS_S3_ENDPOINT_LOCAL_STACK=http://localhost:4566
7-
81
NODE_API_URL=https://api.chainweb.com
92
SYNC_BASE_URL="https://api.chainweb.com/chainweb/0.0"
3+
104
SYNC_MIN_HEIGHT=0
11-
SYNC_FETCH_INTERVAL_IN_BLOCKS=50
12-
SYNC_TIME_BETWEEN_REQUESTS_IN_MS=5000
13-
SYNC_ATTEMPTS_MAX_RETRY=10
14-
SYNC_ATTEMPTS_INTERVAL_IN_MS=2000
5+
SYNC_FETCH_INTERVAL_IN_BLOCKS=100
156
SYNC_NETWORK="mainnet01"
7+
KADENA_GRAPHQL_API_URL=localhost
8+
KADENA_GRAPHQL_API_PORT=3001
169

1710
DB_USERNAME="postgres"
1811
DB_PASSWORD="YOUR_DB_PASSWORD"
1912
DB_NAME="indexer"
20-
DB_HOST="YOUR_DB_HOST"
21-
22-
RUN_GRAPHQL_ON_START=true
23-
RUN_STREAMING_ON_START=true
24-
RUN_MISSING_BLOCKS_ON_START=false
13+
DB_SSL_ENABLED=false
14+
DB_HOST="YOUR_DB_HOST"

indexer/package.json

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,12 @@
6464
"graphql:generate-types": "npx graphql-codegen",
6565
"dev:database": "ts-node src/index.ts --database",
6666
"dev:streaming": "ts-node src/index.ts --streaming",
67-
"dev:backfill": "ts-node src/index.ts --backfill",
68-
"dev:missing": "ts-node src/index.ts --missing",
69-
"dev:headers": "ts-node src/index.ts --headers",
70-
"dev:payloads": "ts-node src/index.ts --payloads",
7167
"dev:old-graphql": "ts-node src/index.ts --oldGraphql",
7268
"dev:graphql": "nodemon src/index.ts --graphql",
73-
"dev:run": "ts-node src/index.ts --run",
7469
"prod:start": "docker-compose up --build indexer && docker-compose logs -f indexer",
7570
"prod:streaming": "node dist/index.js --streaming",
7671
"prod:backfill": "node dist/index.js --backfill",
7772
"test": "NODE_ENV=test mocha -r ts-node/register 'tests/**/*.test.ts'",
78-
"start-localstack": "docker run --rm -p 4566:4566 -p 4571:4571 localstack/localstack",
7973
"create:database": "ts-node src/index.ts --database && yarn migrate:up",
8074
"migrate:up": "dotenv -e .env npx sequelize-cli db:migrate",
8175
"migrate:down": "dotenv -e .env npx sequelize-cli db:migrate:undo"

indexer/src/config/init.ts

Lines changed: 137 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -148,75 +148,75 @@ export async function initializeDatabase(noTrigger = true): Promise<void> {
148148
EXECUTE FUNCTION update_balances();
149149
`);
150150

151-
console.log('Sync public."Balances"...');
152-
153-
// Create the balances table
154-
await sequelize.query(`
155-
CREATE TABLE IF NOT EXISTS public."Balances" (
156-
id serial4 NOT NULL,
157-
account varchar(255) NOT NULL,
158-
"chainId" int4 NOT NULL,
159-
balance numeric(50) DEFAULT 0 NOT NULL,
160-
"module" varchar(255) NOT NULL,
161-
qualname varchar(255) NOT NULL,
162-
"tokenId" varchar(255) NULL,
163-
network varchar(255) NOT NULL,
164-
"hasTokenId" boolean DEFAULT false NOT NULL,
165-
"contractId" int4 NULL,
166-
"createdAt" timestamptz NOT NULL,
167-
"updatedAt" timestamptz NOT NULL,
168-
CONSTRAINT "Balances_pkey" PRIMARY KEY (id)
169-
);
170-
171-
DO $$
172-
BEGIN
173-
IF NOT EXISTS (
174-
SELECT 1
175-
FROM pg_indexes
176-
WHERE schemaname = 'public'
177-
AND tablename = 'Balances'
178-
AND indexname = 'balance_unique_constraint'
179-
) THEN
180-
CREATE UNIQUE INDEX balance_unique_constraint ON public."Balances" USING btree (network, "chainId", account, qualname, "tokenId");
181-
END IF;
182-
END $$;
183-
`);
151+
// console.log('Sync public."Balances"...');
152+
153+
// // Create the balances table
154+
// await sequelize.query(`
155+
// CREATE TABLE IF NOT EXISTS public."Balances" (
156+
// id serial4 NOT NULL,
157+
// account varchar(255) NOT NULL,
158+
// "chainId" int4 NOT NULL,
159+
// balance numeric(50) DEFAULT 0 NOT NULL,
160+
// "module" varchar(255) NOT NULL,
161+
// qualname varchar(255) NOT NULL,
162+
// "tokenId" varchar(255) NULL,
163+
// network varchar(255) NOT NULL,
164+
// "hasTokenId" boolean DEFAULT false NOT NULL,
165+
// "contractId" int4 NULL,
166+
// "createdAt" timestamptz NOT NULL,
167+
// "updatedAt" timestamptz NOT NULL,
168+
// CONSTRAINT "Balances_pkey" PRIMARY KEY (id)
169+
// );
170+
171+
// DO $$
172+
// BEGIN
173+
// IF NOT EXISTS (
174+
// SELECT 1
175+
// FROM pg_indexes
176+
// WHERE schemaname = 'public'
177+
// AND tablename = 'Balances'
178+
// AND indexname = 'balance_unique_constraint'
179+
// ) THEN
180+
// CREATE UNIQUE INDEX balance_unique_constraint ON public."Balances" USING btree (network, "chainId", account, qualname, "tokenId");
181+
// END IF;
182+
// END $$;
183+
// `);
184184

185185
// --------------------------------
186186
// Missing blocks
187187
// --------------------------------
188188

189-
console.log("Sync missing_block_ranges...");
190-
191-
// Create missing blocks view
192-
await sequelize.query(`
193-
CREATE OR REPLACE VIEW missing_block_ranges AS
194-
WITH missing_ranges AS (
195-
SELECT DISTINCT
196-
"chainId",
197-
"chainwebVersion",
198-
height + 1 AS missing_start,
199-
next_height - 1 AS missing_end
200-
FROM (
201-
SELECT
202-
"chainId",
203-
"chainwebVersion",
204-
height,
205-
LEAD(height) OVER (PARTITION BY "chainId", "chainwebVersion" ORDER BY height) AS next_height
206-
FROM "Blocks"
207-
) AS t
208-
WHERE next_height IS NOT NULL AND next_height <> height + 1
209-
)
210-
SELECT DISTINCT
211-
"chainId",
212-
"chainwebVersion",
213-
missing_start AS from_height,
214-
missing_end AS to_height,
215-
(missing_end - missing_start) AS diff
216-
FROM missing_ranges
217-
where (missing_end - missing_start) >= 0
218-
ORDER BY "chainId", "chainwebVersion", missing_start ASC;
219-
`);
189+
// console.log("Sync missing_block_ranges...");
190+
191+
// // Create missing blocks view
192+
// await sequelize.query(`
193+
// CREATE OR REPLACE VIEW missing_block_ranges AS
194+
// WITH missing_ranges AS (
195+
// SELECT DISTINCT
196+
// "chainId",
197+
// "chainwebVersion",
198+
// height + 1 AS missing_start,
199+
// next_height - 1 AS missing_end
200+
// FROM (
201+
// SELECT
202+
// "chainId",
203+
// "chainwebVersion",
204+
// height,
205+
// LEAD(height) OVER (PARTITION BY "chainId", "chainwebVersion" ORDER BY height) AS next_height
206+
// FROM "Blocks"
207+
// ) AS t
208+
// WHERE next_height IS NOT NULL AND next_height <> height + 1
209+
// )
210+
// SELECT DISTINCT
211+
// "chainId",
212+
// "chainwebVersion",
213+
// missing_start AS from_height,
214+
// missing_end AS to_height,
215+
// (missing_end - missing_start) AS diff
216+
// FROM missing_ranges
217+
// where (missing_end - missing_start) >= 0
218+
// ORDER BY "chainId", "chainwebVersion", missing_start ASC;
219+
// `);
220220

221221
// --------------------------------
222222
// Orphan blocks
@@ -268,72 +268,72 @@ export async function initializeDatabase(noTrigger = true): Promise<void> {
268268
$function$
269269
;`);
270270

271-
console.log("Sync public.check_backward_orphans()...");
272-
273-
// Create the check backward orphans function
274-
await sequelize.query(`
275-
CREATE OR REPLACE FUNCTION public.check_backward_orphans()
276-
RETURNS trigger
277-
LANGUAGE plpgsql
278-
AS $function$
279-
DECLARE
280-
recent_blocks RECORD;
281-
previous_block RECORD;
282-
first_block RECORD;
283-
block_count INT := 0;
284-
depth CONSTANT INT := 10; -- Default the depth constant
285-
buffer CONSTANT INT := 5; -- Number of heights to buffer, because some blocks can arrive out of order
286-
BEGIN
287-
PERFORM pg_advisory_xact_lock(hashtext(NEW."chainId"::text || NEW."chainwebVersion"::text));
288-
289-
-- Check the last 'depth' blocks
290-
FOR recent_blocks IN
291-
SELECT * FROM public."Blocks"
292-
WHERE height BETWEEN (NEW.height - buffer - depth) AND (NEW.height - buffer - 1)
293-
AND "chainId" = NEW."chainId"
294-
AND "chainwebVersion" = NEW."chainwebVersion"
295-
AND COALESCE(canonical, TRUE)
296-
ORDER BY height ASC
297-
FOR NO KEY UPDATE
298-
LOOP
299-
-- Set the first block
300-
IF block_count = 0 THEN
301-
first_block := recent_blocks;
302-
END IF;
303-
304-
IF previous_block IS NULL THEN
305-
ELSE
306-
-- Check for non-duplicated block
307-
IF previous_block.height = recent_blocks.height
308-
AND (recent_blocks.canonical = FALSE OR recent_blocks.canonical IS NULL) THEN
309-
PERFORM check_canonical(first_block.hash, recent_blocks.height, recent_blocks."chainId", recent_blocks."chainwebVersion", depth);
310-
ELSE
311-
UPDATE public."Blocks"
312-
SET canonical = TRUE
313-
WHERE hash = recent_blocks.hash
314-
AND "chainId" = NEW."chainId"
315-
AND "chainwebVersion" = NEW."chainwebVersion";
316-
END IF;
317-
END IF;
318-
319-
-- Check for gaps
320-
IF recent_blocks.height <> (NEW.height - buffer) - block_count - 1 THEN
321-
-- If there are gaps, do not change canonical status
322-
RETURN NEW;
323-
END IF;
324-
325-
previous_block := recent_blocks;
326-
block_count := block_count + 1;
327-
END LOOP;
328-
329-
IF previous_block IS NULL THEN
330-
RETURN NEW;
331-
END IF;
332-
333-
RETURN NEW;
334-
END;
335-
$function$
336-
;`);
271+
// console.log("Sync public.check_backward_orphans()...");
272+
273+
// // Create the check backward orphans function
274+
// await sequelize.query(`
275+
// CREATE OR REPLACE FUNCTION public.check_backward_orphans()
276+
// RETURNS trigger
277+
// LANGUAGE plpgsql
278+
// AS $function$
279+
// DECLARE
280+
// recent_blocks RECORD;
281+
// previous_block RECORD;
282+
// first_block RECORD;
283+
// block_count INT := 0;
284+
// depth CONSTANT INT := 10; -- Default the depth constant
285+
// buffer CONSTANT INT := 5; -- Number of heights to buffer, because some blocks can arrive out of order
286+
// BEGIN
287+
// PERFORM pg_advisory_xact_lock(hashtext(NEW."chainId"::text || NEW."chainwebVersion"::text));
288+
289+
// -- Check the last 'depth' blocks
290+
// FOR recent_blocks IN
291+
// SELECT * FROM public."Blocks"
292+
// WHERE height BETWEEN (NEW.height - buffer - depth) AND (NEW.height - buffer - 1)
293+
// AND "chainId" = NEW."chainId"
294+
// AND "chainwebVersion" = NEW."chainwebVersion"
295+
// AND COALESCE(canonical, TRUE)
296+
// ORDER BY height ASC
297+
// FOR NO KEY UPDATE
298+
// LOOP
299+
// -- Set the first block
300+
// IF block_count = 0 THEN
301+
// first_block := recent_blocks;
302+
// END IF;
303+
304+
// IF previous_block IS NULL THEN
305+
// ELSE
306+
// -- Check for non-duplicated block
307+
// IF previous_block.height = recent_blocks.height
308+
// AND (recent_blocks.canonical = FALSE OR recent_blocks.canonical IS NULL) THEN
309+
// PERFORM check_canonical(first_block.hash, recent_blocks.height, recent_blocks."chainId", recent_blocks."chainwebVersion", depth);
310+
// ELSE
311+
// UPDATE public."Blocks"
312+
// SET canonical = TRUE
313+
// WHERE hash = recent_blocks.hash
314+
// AND "chainId" = NEW."chainId"
315+
// AND "chainwebVersion" = NEW."chainwebVersion";
316+
// END IF;
317+
// END IF;
318+
319+
// -- Check for gaps
320+
// IF recent_blocks.height <> (NEW.height - buffer) - block_count - 1 THEN
321+
// -- If there are gaps, do not change canonical status
322+
// RETURN NEW;
323+
// END IF;
324+
325+
// previous_block := recent_blocks;
326+
// block_count := block_count + 1;
327+
// END LOOP;
328+
329+
// IF previous_block IS NULL THEN
330+
// RETURN NEW;
331+
// END IF;
332+
333+
// RETURN NEW;
334+
// END;
335+
// $function$
336+
// ;`);
337337

338338
console.log("Sync public.check_upward_orphans()...");
339339

@@ -455,14 +455,14 @@ export async function initializeDatabase(noTrigger = true): Promise<void> {
455455
FOR EACH ROW
456456
EXECUTE FUNCTION transactions_propagate_canonical_function();`);
457457

458-
console.log("Sync check_orphan_blocks_backward...");
458+
// console.log("Sync check_orphan_blocks_backward...");
459459

460-
// Create orphan blocks trigger
461-
await sequelize.query(`
462-
CREATE OR REPLACE TRIGGER check_orphan_blocks_backward
463-
AFTER INSERT ON public."Blocks"
464-
FOR EACH ROW
465-
EXECUTE FUNCTION check_backward_orphans();`);
460+
// // Create orphan blocks trigger
461+
// await sequelize.query(`
462+
// CREATE OR REPLACE TRIGGER check_orphan_blocks_backward
463+
// AFTER INSERT ON public."Blocks"
464+
// FOR EACH ROW
465+
// EXECUTE FUNCTION check_backward_orphans();`);
466466

467467
console.log("Sync check_orphan_blocks_upward...");
468468

0 commit comments

Comments
 (0)