Skip to content

Commit 4a5dd9f

Browse files
authored
Merge pull request #291 from hack-a-chain-software/order-index-fix
fix: added back the order index property for events in streaming process
2 parents 0c07b39 + 02dc7ed commit 4a5dd9f

File tree

5 files changed

+184
-38
lines changed

5 files changed

+184
-38
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"go-backfill/config"
8+
"log"
9+
"time"
10+
11+
"github.com/jackc/pgx/v5"
12+
)
13+
14+
const (
15+
batchSize = 10000 // Reduced batch size for better performance
16+
)
17+
18+
type Event struct {
19+
ID int64
20+
OrderIndex int64
21+
}
22+
23+
func fixBatchOrderIndex(conn *pgx.Conn, lastTransactionId int64) (bool, int64, error) {
24+
startTime := time.Now()
25+
26+
// Start transaction for writes
27+
tx, err := conn.Begin(context.Background())
28+
if err != nil {
29+
return false, lastTransactionId, fmt.Errorf("failed to begin transaction: %v", err)
30+
}
31+
defer tx.Rollback(context.Background())
32+
33+
// Get transactions in batch
34+
query := `
35+
SELECT DISTINCT t.id
36+
FROM "Transactions" t
37+
JOIN "Events" e ON e."transactionId" = t.id
38+
WHERE t.id > $1
39+
ORDER BY t.id ASC
40+
LIMIT $2
41+
`
42+
43+
rows, err := tx.Query(context.Background(), query, lastTransactionId, batchSize)
44+
if err != nil {
45+
return false, lastTransactionId, fmt.Errorf("failed to query transactions: %v", err)
46+
}
47+
defer rows.Close()
48+
49+
var transactionIds []int64
50+
for rows.Next() {
51+
var id int64
52+
if err := rows.Scan(&id); err != nil {
53+
return false, lastTransactionId, fmt.Errorf("failed to scan transaction id: %v", err)
54+
}
55+
transactionIds = append(transactionIds, id)
56+
}
57+
58+
if len(transactionIds) == 0 {
59+
return false, lastTransactionId, nil
60+
}
61+
62+
// Update all events in a single query using window functions
63+
updateQuery := `
64+
WITH OrderedEvents AS (
65+
SELECT
66+
e.id,
67+
ROW_NUMBER() OVER (PARTITION BY e."transactionId" ORDER BY e.id) - 1 as new_order_index
68+
FROM "Events" e
69+
WHERE e."transactionId" = ANY($1::bigint[])
70+
)
71+
UPDATE "Events" e
72+
SET "orderIndex" = oe.new_order_index
73+
FROM OrderedEvents oe
74+
WHERE e.id = oe.id
75+
AND (e."orderIndex" IS NULL OR e."orderIndex" != oe.new_order_index)
76+
RETURNING e.id
77+
`
78+
79+
rows, err = tx.Query(context.Background(), updateQuery, transactionIds)
80+
if err != nil {
81+
return false, lastTransactionId, fmt.Errorf("failed to execute update: %v", err)
82+
}
83+
defer rows.Close()
84+
85+
var updatedCount int
86+
for rows.Next() {
87+
updatedCount++
88+
}
89+
90+
if err := tx.Commit(context.Background()); err != nil {
91+
return false, lastTransactionId, fmt.Errorf("failed to commit transaction: %v", err)
92+
}
93+
94+
elapsed := time.Since(startTime)
95+
log.Printf("Fixed order_index for %d events in %d transactions. Batch time: %.2fs", updatedCount, len(transactionIds), elapsed.Seconds())
96+
97+
lastProcessedId := transactionIds[len(transactionIds)-1]
98+
return len(transactionIds) == batchSize, lastProcessedId, nil
99+
}
100+
101+
func main() {
102+
envFile := flag.String("env", ".env", "Path to the .env file")
103+
flag.Parse()
104+
105+
config.InitEnv(*envFile)
106+
env := config.GetConfig()
107+
108+
// Database connection
109+
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
110+
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)
111+
112+
conn, err := pgx.Connect(context.Background(), connStr)
113+
if err != nil {
114+
log.Fatalf("Failed to connect to database: %v", err)
115+
}
116+
defer conn.Close(context.Background())
117+
118+
log.Println("Connected to database")
119+
120+
lastId := int64(0)
121+
hasMore := true
122+
processedTransactions := int64(0)
123+
124+
for hasMore {
125+
var err error
126+
hasMore, lastId, err = fixBatchOrderIndex(conn, lastId)
127+
if err != nil {
128+
log.Fatalf("Error during batch processing: %v", err)
129+
}
130+
processedTransactions += batchSize
131+
132+
if hasMore {
133+
log.Printf("Progress: Processed up to transaction ID %d", lastId)
134+
} else {
135+
log.Printf("Progress: Completed all transactions")
136+
}
137+
138+
time.Sleep(100 * time.Millisecond)
139+
}
140+
141+
log.Println("Order index fix completed successfully")
142+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { EventAttributes } from '@/models/event';
2+
import { TransactionCreationAttributes } from '@/models/transaction';
3+
4+
// Map event data to event attributes for database storage
5+
export const mapToEventModel = (
6+
eventsData: any,
7+
transactionAttributes: TransactionCreationAttributes,
8+
): EventAttributes[] => {
9+
const events = eventsData.map((eventData: any) => {
10+
return {
11+
chainId: transactionAttributes.chainId,
12+
module: eventData.module.namespace
13+
? `${eventData.module.namespace}.${eventData.module.name}`
14+
: eventData.module.name,
15+
name: eventData.name,
16+
params: eventData.params,
17+
qualname: eventData.module.namespace
18+
? `${eventData.module.namespace}.${eventData.module.name}`
19+
: eventData.module.name,
20+
requestkey: transactionAttributes.requestkey,
21+
orderIndex: eventData.orderIndex,
22+
} as EventAttributes;
23+
}) as EventAttributes[];
24+
25+
return events;
26+
};

indexer/src/services/coinbase.ts

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import Event, { EventAttributes } from '@/models/event';
2020
import { getCoinTransfers } from './transfers';
2121
import Signer from '@/models/signer';
2222
import Guard from '@/models/guard';
23+
import { mapToEventModel } from '@/models/mappers/event-mapper';
2324

2425
/**
2526
* Interface representing the structured data of a coinbase transaction.
@@ -131,11 +132,12 @@ export async function processCoinbaseTransaction(
131132
coinbase: any,
132133
block: { id: number; chainId: number; creationTime: bigint },
133134
): Promise<CoinbaseTransactionData | undefined> {
134-
// If there's no coinbase data, return undefined
135135
if (!coinbase) return;
136136

137-
// Extract event data from the coinbase
138-
const eventsData = coinbase.events || [];
137+
const eventsData = (coinbase.events || []).map((event: any, index: number) => ({
138+
...event,
139+
orderIndex: index,
140+
}));
139141

140142
// Create transaction attributes for the coinbase transaction
141143
const transactionAttributes = {
@@ -154,21 +156,7 @@ export async function processCoinbaseTransaction(
154156
// Process coin transfers associated with the coinbase transaction
155157
const transfersCoinAttributes = await getCoinTransfers(eventsData, transactionAttributes);
156158

157-
// Process events associated with the coinbase transaction
158-
const eventsAttributes = eventsData.map((eventData: any) => {
159-
return {
160-
chainId: transactionAttributes.chainId,
161-
module: eventData.module.namespace
162-
? `${eventData.module.namespace}.${eventData.module.name}`
163-
: eventData.module.name,
164-
name: eventData.name,
165-
params: eventData.params,
166-
qualname: eventData.module.namespace
167-
? `${eventData.module.namespace}.${eventData.module.name}`
168-
: eventData.module.name,
169-
requestkey: coinbase.reqKey,
170-
} as EventAttributes;
171-
}) as EventAttributes[];
159+
const eventsAttributes = mapToEventModel(eventsData, transactionAttributes);
172160

173161
// Return the structured coinbase transaction data
174162
return { transactionAttributes, eventsAttributes, transfersCoinAttributes };

indexer/src/services/payload.ts

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { sequelize } from '@/config/database';
2727
import { addCoinbaseTransactions } from './coinbase';
2828
import { getRequiredEnvString } from '@/utils/helpers';
2929
import TransactionDetails, { TransactionDetailsAttributes } from '@/models/transaction-details';
30+
import { mapToEventModel } from '@/models/mappers/event-mapper';
3031

3132
// Constants for array indices in the transaction data structure
3233
const TRANSACTION_INDEX = 0;
@@ -127,7 +128,10 @@ export async function processTransaction(
127128
nonce = nonce.replace(/"/g, '');
128129

129130
// Extract events data
130-
const eventsData = receiptInfo.events || [];
131+
const eventsData = (receiptInfo.events || []).map((event: any, index: number) => ({
132+
...event,
133+
orderIndex: index,
134+
}));
131135

132136
// Create transaction attributes object for database storage
133137
const transactionAttributes = {
@@ -160,21 +164,7 @@ export async function processTransaction(
160164
ttl: cmdData.meta.ttl,
161165
} as TransactionDetailsAttributes;
162166

163-
// Map event data to event attributes for database storage
164-
const eventsAttributes = eventsData.map((eventData: any) => {
165-
return {
166-
chainId: transactionAttributes.chainId,
167-
module: eventData.module.namespace
168-
? `${eventData.module.namespace}.${eventData.module.name}`
169-
: eventData.module.name,
170-
name: eventData.name,
171-
params: eventData.params,
172-
qualname: eventData.module.namespace
173-
? `${eventData.module.namespace}.${eventData.module.name}`
174-
: eventData.module.name,
175-
requestkey: receiptInfo.reqKey,
176-
} as EventAttributes;
177-
}) as EventAttributes[];
167+
const eventsAttributes = mapToEventModel(eventsData, transactionAttributes);
178168

179169
// Process transfers for both fungible and non-fungible tokens
180170
const transfersCoinAttributes = await getCoinTransfers(eventsData, transactionAttributes);

indexer/src/services/transfers.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export function getNftTransfers(
7070
// Filter the events array to only include valid NFT transfers
7171
.filter(transferNftSignature)
7272
// Map each matching event to a promise that resolves to a TransferAttributes object
73-
.map(async (eventData: any, index: number): Promise<TransferAttributes> => {
73+
.map(async (eventData: any): Promise<TransferAttributes> => {
7474
// Extract the parameters from the event data
7575
const params = eventData.params;
7676
// param[0] is the token ID (the unique identifier for this NFT)
@@ -117,7 +117,7 @@ export function getNftTransfers(
117117
// Reference to the contract that manages this NFT
118118
contractId: contractId,
119119
// The position of this transfer within the transaction's events
120-
orderIndex: index,
120+
orderIndex: eventData.orderIndex,
121121
} as TransferAttributes;
122122
}) as TransferAttributes[];
123123

@@ -176,7 +176,7 @@ export function getCoinTransfers(
176176
// Filter the events array to only include valid coin transfers
177177
.filter(transferCoinSignature)
178178
// Map each matching event to a promise that resolves to a TransferAttributes object
179-
.map(async (eventData: any, index: number): Promise<TransferAttributes> => {
179+
.map(async (eventData: any): Promise<TransferAttributes> => {
180180
// Get the full module name (including namespace if present)
181181
// This identifies which token module is being transferred (e.g., 'coin', 'fungible-v2', etc.)
182182
const modulename = eventData.module.namespace
@@ -268,7 +268,7 @@ export function getCoinTransfers(
268268
// Reference to the contract that manages this token
269269
contractId: contractId,
270270
// The position of this transfer within the transaction's events
271-
orderIndex: index,
271+
orderIndex: eventData.orderIndex,
272272
} as TransferAttributes;
273273
}) as TransferAttributes[];
274274

0 commit comments

Comments
 (0)