Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions backfill/db-migrator/creationtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"database/sql"
"flag"
"fmt"
"go-backfill/config"
"log"

_ "github.com/lib/pq" // PostgreSQL driver
)

const (
creationTimeBatchSize = 500
startTransactionId = 1
endTransactionId = 1000
)

// This script was created to duplicate the creation time of transaction to the transfers table.
// The main motivation was to improve the performance of the transfers query.

func UpdateCreationTimes() error {
envFile := flag.String("env", ".env", "Path to the .env file")
flag.Parse()
config.InitEnv(*envFile)
env := config.GetConfig()
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)

db, err := sql.Open("postgres", connStr)
if err != nil {
return fmt.Errorf("failed to connect to database: %v", err)
}
defer db.Close()

log.Println("Connected to database")

// Test database connection
if err := db.Ping(); err != nil {
return fmt.Errorf("failed to ping database: %v", err)
}

// Process transactions in batches
if err := processTransactionsBatch(db); err != nil {
return fmt.Errorf("failed to process transactions: %v", err)
}

log.Println("Successfully updated all transfer creation times")
return nil
}

func processTransactionsBatch(db *sql.DB) error {
currentId := startTransactionId
totalProcessed := 0
totalTransactions := endTransactionId - startTransactionId + 1
lastProgressPrinted := -1.0

log.Printf("Starting to process transactions from ID %d to %d",
startTransactionId, endTransactionId)
log.Printf("Total transactions to process: %d", totalTransactions)

for currentId <= endTransactionId {
// Calculate batch end
batchEnd := currentId + creationTimeBatchSize - 1
if batchEnd > endTransactionId {
batchEnd = endTransactionId
}

// Process this batch
processed, err := processBatch(db, currentId, batchEnd)
if err != nil {
return fmt.Errorf("failed to process batch %d-%d: %v", currentId, batchEnd, err)
}

totalProcessed += processed

// Calculate progress percentage
transactionsProcessed := batchEnd - startTransactionId + 1
progressPercent := (float64(transactionsProcessed) / float64(totalTransactions)) * 100.0

// Only print progress if it has increased by at least 0.1%
if progressPercent-lastProgressPrinted >= 0.1 {
log.Printf("Progress: %.1f%%", progressPercent)
lastProgressPrinted = progressPercent
}

// Move to next batch
currentId = batchEnd + 1
}

log.Printf("Completed processing. Total transfers updated: %d (100.0%%)", totalProcessed)
return nil
}

func processBatch(db *sql.DB, startId, endId int) (int, error) {
// Begin transaction for atomic operation
tx, err := db.Begin()
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback() // Will be ignored if tx.Commit() succeeds

// Update transfers with creation time from transactions in a single query
updateQuery := `
UPDATE "Transfers"
SET creationtime = t.creationtime, "updatedAt" = CURRENT_TIMESTAMP
FROM "Transactions" t
WHERE "Transfers"."transactionId" = t.id
AND t.id >= $1 AND t.id <= $2
`

result, err := tx.Exec(updateQuery, startId, endId)
if err != nil {
return 0, fmt.Errorf("failed to update transfers: %v", err)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("failed to get rows affected: %v", err)
}

// Commit the transaction
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("failed to commit transaction: %v", err)
}

return int(rowsAffected), nil
}

func creationTimes() {
if err := UpdateCreationTimes(); err != nil {
log.Fatalf("Error: %v", err)
}
}
3 changes: 0 additions & 3 deletions backfill/process/get_amount_for_transfer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package process

import (
"fmt"
"strconv"
)

Expand All @@ -24,7 +23,6 @@ func GetAmountForTransfer(amount any) (float64, bool) {
if decimalStr, exists := amountMap["decimal"]; exists {
if str, ok := decimalStr.(string); ok {
if value, err := strconv.ParseFloat(str, 64); err == nil {
fmt.Println("decimalStr", decimalStr)
return value, true
}
}
Expand All @@ -35,7 +33,6 @@ func GetAmountForTransfer(amount any) (float64, bool) {
if integerStr, exists := amountMap["integer"]; exists {
if str, ok := integerStr.(string); ok {
if value, err := strconv.ParseFloat(str, 64); err == nil {
fmt.Println("integerStr", integerStr)
return value, true
}
}
Expand Down
2 changes: 2 additions & 0 deletions backfill/process/get_coin_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func GetCoinTransfers(
chainId int,
requestKey string,
transactionId int64,
txCreationTime string,
) []repository.TransferAttributes {
const TransferCoinSignature = "TRANSFER"
const TransferCoinParamsLength = 3
Expand All @@ -37,6 +38,7 @@ func GetCoinTransfers(
ChainId: chainId,
FromAcct: fromAcct,
ModuleHash: event.ModuleHash,
CreationTime: txCreationTime,
ModuleName: moduleName,
RequestKey: requestKey,
ToAcct: toAcct,
Expand Down
3 changes: 2 additions & 1 deletion backfill/process/get_nft_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"log"
)

func GetNftTransfers(network string, chainId int, events []fetch.Event, reqKey string, transactionId int64) []repository.TransferAttributes {
func GetNftTransfers(network string, chainId int, events []fetch.Event, reqKey string, transactionId int64, txCreationTime string) []repository.TransferAttributes {
const ReconcileSignature = "RECONCILE"
const TransferNftParamsLength = 4

Expand Down Expand Up @@ -73,6 +73,7 @@ func GetNftTransfers(network string, chainId int, events []fetch.Event, reqKey s
HasTokenId: tokenId != nil,
TokenId: tokenId,
Type: "poly-fungible",
CreationTime: txCreationTime,
OrderIndex: len(transfers),
}

Expand Down
9 changes: 5 additions & 4 deletions backfill/process/process_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import (
"go-backfill/repository"
)

func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactionsId []int64) ([]repository.TransferAttributes, error) {
func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactionsId []int64, txCreationTimes []string) ([]repository.TransferAttributes, error) {
transactions := payload.Transactions

const avgTransfersPerTransaction = 80
transfers := make([]repository.TransferAttributes, 0, len(transactions)*avgTransfersPerTransaction)

for index, t := range transactions {
ChainId := payload.Header.ChainId
coinTransfers := GetCoinTransfers(t.Events, ChainId, t.ReqKey, transactionsId[index])
nftTransfers := GetNftTransfers(network, ChainId, t.Events, t.ReqKey, transactionsId[index])
coinTransfers := GetCoinTransfers(t.Events, ChainId, t.ReqKey, transactionsId[index], txCreationTimes[index])
nftTransfers := GetNftTransfers(network, ChainId, t.Events, t.ReqKey, transactionsId[index], txCreationTimes[index])
transfers = append(transfers, coinTransfers...)
transfers = append(transfers, nftTransfers...)
}
Expand All @@ -26,7 +26,8 @@ func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactio
}

var coinbaseTxId = transactionsId[len(transactionsId)-1]
coinTransfers := GetCoinTransfers(coinbaseDecoded.Events, payload.Header.ChainId, coinbaseDecoded.ReqKey, coinbaseTxId)
var coinbaseTxCreationTime = txCreationTimes[len(txCreationTimes)-1]
coinTransfers := GetCoinTransfers(coinbaseDecoded.Events, payload.Header.ChainId, coinbaseDecoded.ReqKey, coinbaseTxId, coinbaseTxCreationTime)
transfers = append(transfers, coinTransfers...)

return transfers, nil
Expand Down
13 changes: 10 additions & 3 deletions backfill/process/save_payloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
}

var transactionIdsToSave [][]int64
var txCreationTimesToSave [][]string
var totalGasUsedInChain float64 = 0
for index, processedPayload := range processedPayloads {
var blockId = blockIds[index]
Expand Down Expand Up @@ -111,6 +112,14 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
txsSize := approximateSize(txs)
dataSizeTracker.TransactionsKB += txsSize
transactionIdsToSave = append(transactionIdsToSave, transactionIds)

var txCreationTimes []string
for _, tx := range txs {
txCreationTimes = append(txCreationTimes, tx.CreationTime)
}
txCreationTimes = append(txCreationTimes, txCoinbase.CreationTime)
txCreationTimesToSave = append(txCreationTimesToSave, txCreationTimes)

counters.Transactions += len(transactionIds)
}

Expand Down Expand Up @@ -141,7 +150,7 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
}

for index, processedPayload := range processedPayloads {
transfers, err := PrepareTransfers(network, processedPayload, transactionIdsToSave[index])
transfers, err := PrepareTransfers(network, processedPayload, transactionIdsToSave[index], txCreationTimesToSave[index])
if err != nil {
return Counters{}, DataSizeTracker{}, fmt.Errorf("preparing transfers -> %w", err)
}
Expand Down Expand Up @@ -187,8 +196,6 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
log.Printf("Saved payloads in %fs\n", time.Since(startTime).Seconds())
}

log.Printf("Saved payloads in %fs\n", time.Since(startTime).Seconds())

if err := tx.Commit(context.Background()); err != nil {
return Counters{}, DataSizeTracker{}, fmt.Errorf("committing transaction: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions backfill/repository/transfer_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type TransferAttributes struct {
HasTokenId bool `json:"hasTokenId"`
TokenId *string `json:"tokenId"`
Type string `json:"type"`
CreationTime string `json:"creationtime"`
OrderIndex int `json:"orderIndex"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
Expand All @@ -32,8 +33,8 @@ func SaveTransfersToDatabase(transfers []TransferAttributes, db pgx.Tx) error {

query := `
INSERT INTO "Transfers"
("transactionId", amount, "chainId", from_acct, modulehash, modulename, requestkey, to_acct, "hasTokenId", "tokenId", "type", "orderIndex", "createdAt", "updatedAt", canonical)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
("transactionId", amount, "chainId", from_acct, modulehash, modulename, requestkey, to_acct, "hasTokenId", "tokenId", "type", "creationtime", "orderIndex", "createdAt", "updatedAt", canonical)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
`

now := time.Now()
Expand All @@ -53,6 +54,7 @@ func SaveTransfersToDatabase(transfers []TransferAttributes, db pgx.Tx) error {
transfer.HasTokenId,
transfer.TokenId,
transfer.Type,
transfer.CreationTime,
transfer.OrderIndex,
now,
now,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface, Sequelize) {
await queryInterface.addColumn('Transfers', 'creationtime', {
type: Sequelize.STRING,
comment: "The creation time of the transaction (e.g., '1715747797').",
});
},

async down(queryInterface) {
await queryInterface.removeColumn('Transfers', 'creationtime');
},
};
22 changes: 22 additions & 0 deletions indexer/migrations/20250825181017-add-indexes-to-transfers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict';

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface) {
await queryInterface.addIndex('Transfers', ['creationtime', 'id'], {
name: 'transfers_creationtime_id_idx',
});
await queryInterface.addIndex('Transfers', ['from_acct', 'creationtime', 'id'], {
name: 'transfers_from_acct_creationtime_id_idx',
});
await queryInterface.addIndex('Transfers', ['to_acct', 'creationtime', 'id'], {
name: 'transfers_to_acct_creationtime_id_idx',
});
},

async down(queryInterface) {
await queryInterface.removeIndex('Transfers', 'transfers_creationtime_id_idx');
await queryInterface.removeIndex('Transfers', 'transfers_from_acct_creationtime_id_idx');
await queryInterface.removeIndex('Transfers', 'transfers_to_acct_creationtime_id_idx');
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ export interface GetTotalCountParams {
}

export interface GetCrossChainTransferByPactIdParams {
pactId: string;
amount: string;
receiverAccount: string;
senderAccount: string;
transactionId: string;
}

export type TransferOutput = Omit<Transfer, 'block' | 'transaction' | 'crossChainTransfer'> & {
transferId: string;
pactId: string | null;
transactionId: string;
blockHash: string;
};

Expand Down
Loading