Skip to content

Commit 9eb62b0

Browse files
authored
Merge pull request #449 from hack-a-chain-software/issue-433
fix: transfers query taking too long for accounts with many transfers
2 parents fd7d635 + b6258a2 commit 9eb62b0

28 files changed

+3045
-1990
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package main
2+
3+
import (
4+
"database/sql"
5+
"flag"
6+
"fmt"
7+
"go-backfill/config"
8+
"log"
9+
10+
_ "github.com/lib/pq" // PostgreSQL driver
11+
)
12+
13+
const (
14+
creationTimeBatchSize = 500
15+
startTransactionId = 1
16+
endTransactionId = 1000
17+
)
18+
19+
// This script was created to duplicate the creation time of transaction to the transfers table.
20+
// The main motivation was to improve the performance of the transfers query.
21+
22+
func UpdateCreationTimes() error {
23+
envFile := flag.String("env", ".env", "Path to the .env file")
24+
flag.Parse()
25+
config.InitEnv(*envFile)
26+
env := config.GetConfig()
27+
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
28+
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)
29+
30+
db, err := sql.Open("postgres", connStr)
31+
if err != nil {
32+
return fmt.Errorf("failed to connect to database: %v", err)
33+
}
34+
defer db.Close()
35+
36+
log.Println("Connected to database")
37+
38+
// Test database connection
39+
if err := db.Ping(); err != nil {
40+
return fmt.Errorf("failed to ping database: %v", err)
41+
}
42+
43+
// Process transactions in batches
44+
if err := processTransactionsBatch(db); err != nil {
45+
return fmt.Errorf("failed to process transactions: %v", err)
46+
}
47+
48+
log.Println("Successfully updated all transfer creation times")
49+
return nil
50+
}
51+
52+
func processTransactionsBatch(db *sql.DB) error {
53+
currentId := startTransactionId
54+
totalProcessed := 0
55+
totalTransactions := endTransactionId - startTransactionId + 1
56+
lastProgressPrinted := -1.0
57+
58+
log.Printf("Starting to process transactions from ID %d to %d",
59+
startTransactionId, endTransactionId)
60+
log.Printf("Total transactions to process: %d", totalTransactions)
61+
62+
for currentId <= endTransactionId {
63+
// Calculate batch end
64+
batchEnd := currentId + creationTimeBatchSize - 1
65+
if batchEnd > endTransactionId {
66+
batchEnd = endTransactionId
67+
}
68+
69+
// Process this batch
70+
processed, err := processBatch(db, currentId, batchEnd)
71+
if err != nil {
72+
return fmt.Errorf("failed to process batch %d-%d: %v", currentId, batchEnd, err)
73+
}
74+
75+
totalProcessed += processed
76+
77+
// Calculate progress percentage
78+
transactionsProcessed := batchEnd - startTransactionId + 1
79+
progressPercent := (float64(transactionsProcessed) / float64(totalTransactions)) * 100.0
80+
81+
// Only print progress if it has increased by at least 0.1%
82+
if progressPercent-lastProgressPrinted >= 0.1 {
83+
log.Printf("Progress: %.1f%%", progressPercent)
84+
lastProgressPrinted = progressPercent
85+
}
86+
87+
// Move to next batch
88+
currentId = batchEnd + 1
89+
}
90+
91+
log.Printf("Completed processing. Total transfers updated: %d (100.0%%)", totalProcessed)
92+
return nil
93+
}
94+
95+
func processBatch(db *sql.DB, startId, endId int) (int, error) {
96+
// Begin transaction for atomic operation
97+
tx, err := db.Begin()
98+
if err != nil {
99+
return 0, fmt.Errorf("failed to begin transaction: %v", err)
100+
}
101+
defer tx.Rollback() // Will be ignored if tx.Commit() succeeds
102+
103+
// Update transfers with creation time from transactions in a single query
104+
updateQuery := `
105+
UPDATE "Transfers"
106+
SET creationtime = t.creationtime, "updatedAt" = CURRENT_TIMESTAMP
107+
FROM "Transactions" t
108+
WHERE "Transfers"."transactionId" = t.id
109+
AND t.id >= $1 AND t.id <= $2
110+
`
111+
112+
result, err := tx.Exec(updateQuery, startId, endId)
113+
if err != nil {
114+
return 0, fmt.Errorf("failed to update transfers: %v", err)
115+
}
116+
117+
rowsAffected, err := result.RowsAffected()
118+
if err != nil {
119+
return 0, fmt.Errorf("failed to get rows affected: %v", err)
120+
}
121+
122+
// Commit the transaction
123+
if err := tx.Commit(); err != nil {
124+
return 0, fmt.Errorf("failed to commit transaction: %v", err)
125+
}
126+
127+
return int(rowsAffected), nil
128+
}
129+
130+
func creationTimes() {
131+
if err := UpdateCreationTimes(); err != nil {
132+
log.Fatalf("Error: %v", err)
133+
}
134+
}

backfill/process/get_amount_for_transfer.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package process
22

33
import (
4-
"fmt"
54
"strconv"
65
)
76

@@ -24,7 +23,6 @@ func GetAmountForTransfer(amount any) (float64, bool) {
2423
if decimalStr, exists := amountMap["decimal"]; exists {
2524
if str, ok := decimalStr.(string); ok {
2625
if value, err := strconv.ParseFloat(str, 64); err == nil {
27-
fmt.Println("decimalStr", decimalStr)
2826
return value, true
2927
}
3028
}
@@ -35,7 +33,6 @@ func GetAmountForTransfer(amount any) (float64, bool) {
3533
if integerStr, exists := amountMap["integer"]; exists {
3634
if str, ok := integerStr.(string); ok {
3735
if value, err := strconv.ParseFloat(str, 64); err == nil {
38-
fmt.Println("integerStr", integerStr)
3936
return value, true
4037
}
4138
}

backfill/process/get_coin_transfers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ func GetCoinTransfers(
1111
chainId int,
1212
requestKey string,
1313
transactionId int64,
14+
txCreationTime string,
1415
) []repository.TransferAttributes {
1516
const TransferCoinSignature = "TRANSFER"
1617
const TransferCoinParamsLength = 3
@@ -37,6 +38,7 @@ func GetCoinTransfers(
3738
ChainId: chainId,
3839
FromAcct: fromAcct,
3940
ModuleHash: event.ModuleHash,
41+
CreationTime: txCreationTime,
4042
ModuleName: moduleName,
4143
RequestKey: requestKey,
4244
ToAcct: toAcct,

backfill/process/get_nft_transfers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"log"
77
)
88

9-
func GetNftTransfers(network string, chainId int, events []fetch.Event, reqKey string, transactionId int64) []repository.TransferAttributes {
9+
func GetNftTransfers(network string, chainId int, events []fetch.Event, reqKey string, transactionId int64, txCreationTime string) []repository.TransferAttributes {
1010
const ReconcileSignature = "RECONCILE"
1111
const TransferNftParamsLength = 4
1212

@@ -73,6 +73,7 @@ func GetNftTransfers(network string, chainId int, events []fetch.Event, reqKey s
7373
HasTokenId: tokenId != nil,
7474
TokenId: tokenId,
7575
Type: "poly-fungible",
76+
CreationTime: txCreationTime,
7677
OrderIndex: len(transfers),
7778
}
7879

backfill/process/process_transfers.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import (
66
"go-backfill/repository"
77
)
88

9-
func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactionsId []int64) ([]repository.TransferAttributes, error) {
9+
func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactionsId []int64, txCreationTimes []string) ([]repository.TransferAttributes, error) {
1010
transactions := payload.Transactions
1111

1212
const avgTransfersPerTransaction = 80
1313
transfers := make([]repository.TransferAttributes, 0, len(transactions)*avgTransfersPerTransaction)
1414

1515
for index, t := range transactions {
1616
ChainId := payload.Header.ChainId
17-
coinTransfers := GetCoinTransfers(t.Events, ChainId, t.ReqKey, transactionsId[index])
18-
nftTransfers := GetNftTransfers(network, ChainId, t.Events, t.ReqKey, transactionsId[index])
17+
coinTransfers := GetCoinTransfers(t.Events, ChainId, t.ReqKey, transactionsId[index], txCreationTimes[index])
18+
nftTransfers := GetNftTransfers(network, ChainId, t.Events, t.ReqKey, transactionsId[index], txCreationTimes[index])
1919
transfers = append(transfers, coinTransfers...)
2020
transfers = append(transfers, nftTransfers...)
2121
}
@@ -26,7 +26,8 @@ func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactio
2626
}
2727

2828
var coinbaseTxId = transactionsId[len(transactionsId)-1]
29-
coinTransfers := GetCoinTransfers(coinbaseDecoded.Events, payload.Header.ChainId, coinbaseDecoded.ReqKey, coinbaseTxId)
29+
var coinbaseTxCreationTime = txCreationTimes[len(txCreationTimes)-1]
30+
coinTransfers := GetCoinTransfers(coinbaseDecoded.Events, payload.Header.ChainId, coinbaseDecoded.ReqKey, coinbaseTxId, coinbaseTxCreationTime)
3031
transfers = append(transfers, coinTransfers...)
3132

3233
return transfers, nil

backfill/process/save_payloads.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
7373
}
7474

7575
var transactionIdsToSave [][]int64
76+
var txCreationTimesToSave [][]string
7677
var totalGasUsedInChain float64 = 0
7778
for index, processedPayload := range processedPayloads {
7879
var blockId = blockIds[index]
@@ -111,6 +112,14 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
111112
txsSize := approximateSize(txs)
112113
dataSizeTracker.TransactionsKB += txsSize
113114
transactionIdsToSave = append(transactionIdsToSave, transactionIds)
115+
116+
var txCreationTimes []string
117+
for _, tx := range txs {
118+
txCreationTimes = append(txCreationTimes, tx.CreationTime)
119+
}
120+
txCreationTimes = append(txCreationTimes, txCoinbase.CreationTime)
121+
txCreationTimesToSave = append(txCreationTimesToSave, txCreationTimes)
122+
114123
counters.Transactions += len(transactionIds)
115124
}
116125

@@ -141,7 +150,7 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
141150
}
142151

143152
for index, processedPayload := range processedPayloads {
144-
transfers, err := PrepareTransfers(network, processedPayload, transactionIdsToSave[index])
153+
transfers, err := PrepareTransfers(network, processedPayload, transactionIdsToSave[index], txCreationTimesToSave[index])
145154
if err != nil {
146155
return Counters{}, DataSizeTracker{}, fmt.Errorf("preparing transfers -> %w", err)
147156
}
@@ -187,8 +196,6 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
187196
log.Printf("Saved payloads in %fs\n", time.Since(startTime).Seconds())
188197
}
189198

190-
log.Printf("Saved payloads in %fs\n", time.Since(startTime).Seconds())
191-
192199
if err := tx.Commit(context.Background()); err != nil {
193200
return Counters{}, DataSizeTracker{}, fmt.Errorf("committing transaction: %w", err)
194201
}

backfill/repository/transfer_repository.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type TransferAttributes struct {
2020
HasTokenId bool `json:"hasTokenId"`
2121
TokenId *string `json:"tokenId"`
2222
Type string `json:"type"`
23+
CreationTime string `json:"creationtime"`
2324
OrderIndex int `json:"orderIndex"`
2425
CreatedAt time.Time `json:"createdAt"`
2526
UpdatedAt time.Time `json:"updatedAt"`
@@ -32,8 +33,8 @@ func SaveTransfersToDatabase(transfers []TransferAttributes, db pgx.Tx) error {
3233

3334
query := `
3435
INSERT INTO "Transfers"
35-
("transactionId", amount, "chainId", from_acct, modulehash, modulename, requestkey, to_acct, "hasTokenId", "tokenId", "type", "orderIndex", "createdAt", "updatedAt", canonical)
36-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
36+
("transactionId", amount, "chainId", from_acct, modulehash, modulename, requestkey, to_acct, "hasTokenId", "tokenId", "type", "creationtime", "orderIndex", "createdAt", "updatedAt", canonical)
37+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
3738
`
3839

3940
now := time.Now()
@@ -53,6 +54,7 @@ func SaveTransfersToDatabase(transfers []TransferAttributes, db pgx.Tx) error {
5354
transfer.HasTokenId,
5455
transfer.TokenId,
5556
transfer.Type,
57+
transfer.CreationTime,
5658
transfer.OrderIndex,
5759
now,
5860
now,
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
'use strict';
2+
3+
/** @type {import('sequelize-cli').Migration} */
4+
module.exports = {
5+
async up(queryInterface, Sequelize) {
6+
await queryInterface.addColumn('Transfers', 'creationtime', {
7+
type: Sequelize.STRING,
8+
comment: "The creation time of the transaction (e.g., '1715747797').",
9+
});
10+
},
11+
12+
async down(queryInterface) {
13+
await queryInterface.removeColumn('Transfers', 'creationtime');
14+
},
15+
};
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
'use strict';
2+
3+
/** @type {import('sequelize-cli').Migration} */
4+
module.exports = {
5+
async up(queryInterface) {
6+
await queryInterface.addIndex('Transfers', ['creationtime', 'id'], {
7+
name: 'transfers_creationtime_id_idx',
8+
});
9+
await queryInterface.addIndex('Transfers', ['from_acct', 'creationtime', 'id'], {
10+
name: 'transfers_from_acct_creationtime_id_idx',
11+
});
12+
await queryInterface.addIndex('Transfers', ['to_acct', 'creationtime', 'id'], {
13+
name: 'transfers_to_acct_creationtime_id_idx',
14+
});
15+
},
16+
17+
async down(queryInterface) {
18+
await queryInterface.removeIndex('Transfers', 'transfers_creationtime_id_idx');
19+
await queryInterface.removeIndex('Transfers', 'transfers_from_acct_creationtime_id_idx');
20+
await queryInterface.removeIndex('Transfers', 'transfers_to_acct_creationtime_id_idx');
21+
},
22+
};

indexer/src/kadena-server/repository/application/transfer-repository.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ export interface GetTotalCountParams {
2323
}
2424

2525
export interface GetCrossChainTransferByPactIdParams {
26-
pactId: string;
2726
amount: string;
2827
receiverAccount: string;
2928
senderAccount: string;
29+
transactionId: string;
3030
}
3131

3232
export type TransferOutput = Omit<Transfer, 'block' | 'transaction' | 'crossChainTransfer'> & {
3333
transferId: string;
34-
pactId: string | null;
34+
transactionId: string;
3535
blockHash: string;
3636
};
3737

0 commit comments

Comments
 (0)