Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions backfill/db-migrator/creationtime-events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"database/sql"
"flag"
"fmt"
"go-backfill/config"
"log"

_ "github.com/lib/pq" // PostgreSQL driver
)

const (
creationTimeBatchSizeEvents = 500
startTransactionIdEvents = 1
endTransactionIdEvents = 1000
)

// This script was created to duplicate the creation time of transaction to the events table.
// The main motivation was to improve the performance of the events query.

func updateCreationTimesForEvents() error {
envFile := flag.String("env", ".env", "Path to the .env file")
flag.Parse()
config.InitEnv(*envFile)
env := config.GetConfig()
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)

db, err := sql.Open("postgres", connStr)
if err != nil {
return fmt.Errorf("failed to connect to database: %v", err)
}
defer db.Close()

log.Println("Connected to database")

// Test database connection
if err := db.Ping(); err != nil {
return fmt.Errorf("failed to ping database: %v", err)
}

// Process transactions in batches
if err := processTransactionsBatchForEvents(db); err != nil {
return fmt.Errorf("failed to process transactions: %v", err)
}

log.Println("Successfully updated all events creation times")
return nil
}

func processTransactionsBatchForEvents(db *sql.DB) error {
currentId := startTransactionIdEvents
totalProcessed := 0
totalTransactions := endTransactionIdEvents - startTransactionIdEvents + 1
lastProgressPrinted := -1.0

log.Printf("Starting to process transactions from ID %d to %d",
startTransactionIdEvents, endTransactionIdEvents)
log.Printf("Total transactions to process: %d", totalTransactions)

for currentId <= endTransactionIdEvents {
// Calculate batch end
batchEnd := currentId + creationTimeBatchSizeEvents - 1
if batchEnd > endTransactionIdEvents {
batchEnd = endTransactionIdEvents
}

// Process this batch
processed, err := processBatchForEvents(db, currentId, batchEnd)
if err != nil {
return fmt.Errorf("failed to process batch %d-%d: %v", currentId, batchEnd, err)
}

totalProcessed += processed

// Calculate progress percentage
transactionsProcessed := batchEnd - startTransactionIdEvents + 1
progressPercent := (float64(transactionsProcessed) / float64(totalTransactions)) * 100.0

// Only print progress if it has increased by at least 0.1%
if progressPercent-lastProgressPrinted >= 0.1 {
log.Printf("Progress: %.1f%%", progressPercent)
lastProgressPrinted = progressPercent
}

// Move to next batch
currentId = batchEnd + 1
}

log.Printf("Completed processing. Total events updated: %d (100.0%%)", totalProcessed)
return nil
}

func processBatchForEvents(db *sql.DB, startId, endId int) (int, error) {
// Begin transaction for atomic operation
tx, err := db.Begin()
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback() // Will be ignored if tx.Commit() succeeds

// Update events with creation time from transactions in a single query
updateQuery := `
UPDATE "Events"
SET creationtime = t.creationtime, "updatedAt" = CURRENT_TIMESTAMP
FROM "Transactions" t
WHERE "Events"."transactionId" = t.id
AND t.id >= $1 AND t.id <= $2
`

result, err := tx.Exec(updateQuery, startId, endId)
if err != nil {
return 0, fmt.Errorf("failed to update events: %v", err)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("failed to get rows affected: %v", err)
}

// Commit the transaction
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("failed to commit transaction: %v", err)
}

return int(rowsAffected), nil
}

func creationTimesEvents() {
if err := updateCreationTimesForEvents(); err != nil {
log.Fatalf("Error: %v", err)
}
}
46 changes: 33 additions & 13 deletions backfill/db-migrator/creationtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
const (
creationTimeBatchSize = 500
startTransactionId = 1
endTransactionId = 1000
endTransactionId = 110239835
)

// This script was created to duplicate the creation time of transaction to the transfers table.
// The main motivation was to improve the performance of the transfers query.
// This script was created to duplicate the creation time of transaction to the events and transfers tables.
// The main motivation was to improve the performance of the events and transfers queries.

func UpdateCreationTimes() error {
func updateCreationTimes() error {
envFile := flag.String("env", ".env", "Path to the .env file")
flag.Parse()
config.InitEnv(*envFile)
Expand All @@ -45,7 +45,7 @@ func UpdateCreationTimes() error {
return fmt.Errorf("failed to process transactions: %v", err)
}

log.Println("Successfully updated all transfer creation times")
log.Println("Successfully updated all events and transfers creation times")
return nil
}

Expand Down Expand Up @@ -88,7 +88,7 @@ func processTransactionsBatch(db *sql.DB) error {
currentId = batchEnd + 1
}

log.Printf("Completed processing. Total transfers updated: %d (100.0%%)", totalProcessed)
log.Printf("Completed processing. Total records updated: %d (100.0%%)", totalProcessed)
return nil
}

Expand All @@ -100,35 +100,55 @@ func processBatch(db *sql.DB, startId, endId int) (int, error) {
}
defer tx.Rollback() // Will be ignored if tx.Commit() succeeds

// Update transfers with creation time from transactions in a single query
updateQuery := `
// Update events with creation time from transactions
eventsUpdateQuery := `
UPDATE "Events"
SET creationtime = t.creationtime, "updatedAt" = CURRENT_TIMESTAMP
FROM "Transactions" t
WHERE "Events"."transactionId" = t.id
AND t.id >= $1 AND t.id <= $2
`

eventsResult, err := tx.Exec(eventsUpdateQuery, startId, endId)
if err != nil {
return 0, fmt.Errorf("failed to update events: %v", err)
}

eventsRowsAffected, err := eventsResult.RowsAffected()
if err != nil {
return 0, fmt.Errorf("failed to get events rows affected: %v", err)
}

// Update transfers with creation time from transactions
transfersUpdateQuery := `
UPDATE "Transfers"
SET creationtime = t.creationtime, "updatedAt" = CURRENT_TIMESTAMP
FROM "Transactions" t
WHERE "Transfers"."transactionId" = t.id
AND t.id >= $1 AND t.id <= $2
`

result, err := tx.Exec(updateQuery, startId, endId)
transfersResult, err := tx.Exec(transfersUpdateQuery, startId, endId)
if err != nil {
return 0, fmt.Errorf("failed to update transfers: %v", err)
}

rowsAffected, err := result.RowsAffected()
transfersRowsAffected, err := transfersResult.RowsAffected()
if err != nil {
return 0, fmt.Errorf("failed to get rows affected: %v", err)
return 0, fmt.Errorf("failed to get transfers rows affected: %v", err)
}

// Commit the transaction
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("failed to commit transaction: %v", err)
}

return int(rowsAffected), nil
totalRowsAffected := int(eventsRowsAffected + transfersRowsAffected)
return totalRowsAffected, nil
}

func creationTimes() {
if err := UpdateCreationTimes(); err != nil {
if err := updateCreationTimes(); err != nil {
log.Fatalf("Error: %v", err)
}
}
6 changes: 5 additions & 1 deletion backfill/process/process_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"go-backfill/repository"
)

func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsId []int64) ([]repository.EventAttributes, error) {
func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsId []int64, txCreationTimes []string) ([]repository.EventAttributes, error) {
transactions := payload.Transactions

const avgEventsPerTransaction = 80
events := make([]repository.EventAttributes, 0, len(transactions)*avgEventsPerTransaction)

for txIndex, t := range transactions {
txCreationTime := txCreationTimes[txIndex]
for eventIndex, event := range t.Events {
module := buildModuleName(event.Module.Namespace, event.Module.Name)
qualName := buildModuleName(event.Module.Namespace, event.Module.Name)
Expand All @@ -32,6 +33,7 @@ func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsI
QualName: qualName,
RequestKey: t.ReqKey,
OrderIndex: eventIndex,
CreationTime: txCreationTime,
}
events = append(events, eventRecord)
}
Expand All @@ -43,6 +45,7 @@ func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsI
}

var coinbaseTxId = transactionsId[len(transactionsId)-1]
var coinbaseTxCreationTime = txCreationTimes[len(txCreationTimes)-1]
for eventIndex, event := range coinbaseDecoded.Events {

module := buildModuleName(event.Module.Namespace, event.Module.Name)
Expand All @@ -61,6 +64,7 @@ func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsI
QualName: qualName,
RequestKey: coinbaseDecoded.ReqKey,
OrderIndex: eventIndex,
CreationTime: coinbaseTxCreationTime,
}
events = append(events, eventRecord)
}
Expand Down
2 changes: 1 addition & 1 deletion backfill/process/save_payloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
}

for index, processedPayload := range processedPayloads {
events, err := PrepareEvents(network, processedPayload, transactionIdsToSave[index])
events, err := PrepareEvents(network, processedPayload, transactionIdsToSave[index], txCreationTimesToSave[index])
if err != nil {
return Counters{}, DataSizeTracker{}, fmt.Errorf("preparing events -> %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions backfill/repository/event_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type EventAttributes struct {
QualName string `json:"qualName"`
RequestKey string `json:"requestKey"`
OrderIndex int `json:"orderIndex"`
CreationTime string `json:"creationtime"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
Expand All @@ -29,8 +30,8 @@ func SaveEventsToDatabase(events []EventAttributes, db pgx.Tx) error {

query := `
INSERT INTO "Events"
("transactionId", "chainId", "module", name, params, qualname, requestkey, "orderIndex", "createdAt", "updatedAt", canonical)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
("transactionId", "chainId", "module", name, params, qualname, requestkey, "creationtime", "orderIndex", "createdAt", "updatedAt", canonical)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
`

now := time.Now()
Expand All @@ -46,6 +47,7 @@ func SaveEventsToDatabase(events []EventAttributes, db pgx.Tx) error {
event.Params,
event.QualName,
event.RequestKey,
event.CreationTime,
event.OrderIndex,
now,
now,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface, Sequelize) {
await queryInterface.addColumn('Events', 'creationtime', {
type: Sequelize.STRING,
comment: "The creation time of the transaction (e.g., '1715747797').",
});
},

async down(queryInterface) {
await queryInterface.removeColumn('Events', 'creationtime');
},
};
26 changes: 26 additions & 0 deletions indexer/migrations/20250827120929-add-indexes-to-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface) {
await queryInterface.addIndex('Events', ['creationtime', 'id'], {
name: 'events_creationtime_id_idx',
});
await queryInterface.addIndex('Events', ['module', 'id'], {
name: 'events_module_id_idx',
});
await queryInterface.addIndex('Events', ['module', 'chainId', 'id'], {
name: 'events_module_chain_id_id_idx',
});
await queryInterface.addIndex('Events', ['module', 'name', 'creationtime', 'id'], {
name: 'events_module_name_creationtime_id_idx',
});
},

async down(queryInterface) {
await queryInterface.removeIndex('Events', 'events_creationtime_id_idx');
await queryInterface.removeIndex('Events', 'events_module_id_idx');
await queryInterface.removeIndex('Events', 'events_module_chain_id_id_idx');
await queryInterface.removeIndex('Events', 'events_module_name_creationtime_id_idx');
},
};
5 changes: 3 additions & 2 deletions indexer/src/kadena-server/config/graphql-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -868,9 +868,10 @@ export type QueryEventsArgs = {
maxHeight?: InputMaybe<Scalars['Int']['input']>;
minHeight?: InputMaybe<Scalars['Int']['input']>;
minimumDepth?: InputMaybe<Scalars['Int']['input']>;
moduleName?: InputMaybe<Scalars['String']['input']>;
orderIndex?: InputMaybe<Scalars['Int']['input']>;
parametersFilter?: InputMaybe<Scalars['String']['input']>;
qualifiedEventName: Scalars['String']['input'];
qualifiedEventName?: InputMaybe<Scalars['String']['input']>;
requestKey?: InputMaybe<Scalars['String']['input']>;
};

Expand Down Expand Up @@ -3017,7 +3018,7 @@ export type QueryResolvers<
ResolversTypes['QueryEventsConnection'],
ParentType,
ContextType,
RequireFields<QueryEventsArgs, 'qualifiedEventName'>
Partial<QueryEventsArgs>
>;
fungibleAccount?: Resolver<
Maybe<ResolversTypes['FungibleAccount']>,
Expand Down
9 changes: 5 additions & 4 deletions indexer/src/kadena-server/config/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,18 @@ type Query {
events(
after: String
before: String
blockHash: String
chainId: String
first: Int
last: Int
qualifiedEventName: String
moduleName: String
blockHash: String
requestKey: String
chainId: String
maxHeight: Int
minHeight: Int
minimumDepth: Int
orderIndex: Int
parametersFilter: String
qualifiedEventName: String!
requestKey: String
): QueryEventsConnection! @complexity(value: 1, multipliers: ["first", "last"])

"""
Expand Down
Loading