Skip to content

Commit 4d4f356

Browse files
committed
feat: added module as a new param in events query and improved the indexes to make the query performance better
1 parent 8eebbc2 commit 4d4f356

29 files changed

+3812
-633
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package main
2+
3+
import (
4+
"database/sql"
5+
"flag"
6+
"fmt"
7+
"go-backfill/config"
8+
"log"
9+
10+
_ "github.com/lib/pq" // PostgreSQL driver
11+
)
12+
13+
const (
14+
creationTimeBatchSizeEvents = 500
15+
startTransactionIdEvents = 1
16+
endTransactionIdEvents = 1000
17+
)
18+
19+
// This script was created to duplicate the creation time of transaction to the events table.
20+
// The main motivation was to improve the performance of the events query.
21+
22+
func updateCreationTimesForEvents() error {
23+
envFile := flag.String("env", ".env", "Path to the .env file")
24+
flag.Parse()
25+
config.InitEnv(*envFile)
26+
env := config.GetConfig()
27+
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
28+
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)
29+
30+
db, err := sql.Open("postgres", connStr)
31+
if err != nil {
32+
return fmt.Errorf("failed to connect to database: %v", err)
33+
}
34+
defer db.Close()
35+
36+
log.Println("Connected to database")
37+
38+
// Test database connection
39+
if err := db.Ping(); err != nil {
40+
return fmt.Errorf("failed to ping database: %v", err)
41+
}
42+
43+
// Process transactions in batches
44+
if err := processTransactionsBatchForEvents(db); err != nil {
45+
return fmt.Errorf("failed to process transactions: %v", err)
46+
}
47+
48+
log.Println("Successfully updated all events creation times")
49+
return nil
50+
}
51+
52+
func processTransactionsBatchForEvents(db *sql.DB) error {
53+
currentId := startTransactionIdEvents
54+
totalProcessed := 0
55+
totalTransactions := endTransactionIdEvents - startTransactionIdEvents + 1
56+
lastProgressPrinted := -1.0
57+
58+
log.Printf("Starting to process transactions from ID %d to %d",
59+
startTransactionIdEvents, endTransactionIdEvents)
60+
log.Printf("Total transactions to process: %d", totalTransactions)
61+
62+
for currentId <= endTransactionIdEvents {
63+
// Calculate batch end
64+
batchEnd := currentId + creationTimeBatchSizeEvents - 1
65+
if batchEnd > endTransactionIdEvents {
66+
batchEnd = endTransactionIdEvents
67+
}
68+
69+
// Process this batch
70+
processed, err := processBatchForEvents(db, currentId, batchEnd)
71+
if err != nil {
72+
return fmt.Errorf("failed to process batch %d-%d: %v", currentId, batchEnd, err)
73+
}
74+
75+
totalProcessed += processed
76+
77+
// Calculate progress percentage
78+
transactionsProcessed := batchEnd - startTransactionIdEvents + 1
79+
progressPercent := (float64(transactionsProcessed) / float64(totalTransactions)) * 100.0
80+
81+
// Only print progress if it has increased by at least 0.1%
82+
if progressPercent-lastProgressPrinted >= 0.1 {
83+
log.Printf("Progress: %.1f%%", progressPercent)
84+
lastProgressPrinted = progressPercent
85+
}
86+
87+
// Move to next batch
88+
currentId = batchEnd + 1
89+
}
90+
91+
log.Printf("Completed processing. Total events updated: %d (100.0%%)", totalProcessed)
92+
return nil
93+
}
94+
95+
func processBatchForEvents(db *sql.DB, startId, endId int) (int, error) {
96+
// Begin transaction for atomic operation
97+
tx, err := db.Begin()
98+
if err != nil {
99+
return 0, fmt.Errorf("failed to begin transaction: %v", err)
100+
}
101+
defer tx.Rollback() // Will be ignored if tx.Commit() succeeds
102+
103+
// Update events with creation time from transactions in a single query
104+
updateQuery := `
105+
UPDATE "Events"
106+
SET creationtime = t.creationtime, "updatedAt" = CURRENT_TIMESTAMP
107+
FROM "Transactions" t
108+
WHERE "Events"."transactionId" = t.id
109+
AND t.id >= $1 AND t.id <= $2
110+
`
111+
112+
result, err := tx.Exec(updateQuery, startId, endId)
113+
if err != nil {
114+
return 0, fmt.Errorf("failed to update events: %v", err)
115+
}
116+
117+
rowsAffected, err := result.RowsAffected()
118+
if err != nil {
119+
return 0, fmt.Errorf("failed to get rows affected: %v", err)
120+
}
121+
122+
// Commit the transaction
123+
if err := tx.Commit(); err != nil {
124+
return 0, fmt.Errorf("failed to commit transaction: %v", err)
125+
}
126+
127+
return int(rowsAffected), nil
128+
}
129+
130+
func creationTimesEvents() {
131+
if err := updateCreationTimesForEvents(); err != nil {
132+
log.Fatalf("Error: %v", err)
133+
}
134+
}

backfill/db-migrator/creationtime.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ import (
1111
)
1212

1313
const (
14-
creationTimeBatchSize = 500
15-
startTransactionId = 1
16-
endTransactionId = 1000
14+
creationTimeBatchSizeTransfers = 500
15+
startTransactionIdTransfers = 1
16+
endTransactionIdTransfers = 1000
1717
)
1818

1919
// This script was created to duplicate the creation time of transaction to the transfers table.
2020
// The main motivation was to improve the performance of the transfers query.
2121

22-
func UpdateCreationTimes() error {
22+
func updateCreationTimes() error {
2323
envFile := flag.String("env", ".env", "Path to the .env file")
2424
flag.Parse()
2525
config.InitEnv(*envFile)
@@ -50,20 +50,20 @@ func UpdateCreationTimes() error {
5050
}
5151

5252
func processTransactionsBatch(db *sql.DB) error {
53-
currentId := startTransactionId
53+
currentId := startTransactionIdTransfers
5454
totalProcessed := 0
55-
totalTransactions := endTransactionId - startTransactionId + 1
55+
totalTransactions := endTransactionIdTransfers - startTransactionIdTransfers + 1
5656
lastProgressPrinted := -1.0
5757

5858
log.Printf("Starting to process transactions from ID %d to %d",
59-
startTransactionId, endTransactionId)
59+
startTransactionIdTransfers, endTransactionIdTransfers)
6060
log.Printf("Total transactions to process: %d", totalTransactions)
6161

62-
for currentId <= endTransactionId {
62+
for currentId <= endTransactionIdTransfers {
6363
// Calculate batch end
64-
batchEnd := currentId + creationTimeBatchSize - 1
65-
if batchEnd > endTransactionId {
66-
batchEnd = endTransactionId
64+
batchEnd := currentId + creationTimeBatchSizeTransfers - 1
65+
if batchEnd > endTransactionIdTransfers {
66+
batchEnd = endTransactionIdTransfers
6767
}
6868

6969
// Process this batch
@@ -75,7 +75,7 @@ func processTransactionsBatch(db *sql.DB) error {
7575
totalProcessed += processed
7676

7777
// Calculate progress percentage
78-
transactionsProcessed := batchEnd - startTransactionId + 1
78+
transactionsProcessed := batchEnd - startTransactionIdTransfers + 1
7979
progressPercent := (float64(transactionsProcessed) / float64(totalTransactions)) * 100.0
8080

8181
// Only print progress if it has increased by at least 0.1%
@@ -128,7 +128,7 @@ func processBatch(db *sql.DB, startId, endId int) (int, error) {
128128
}
129129

130130
func creationTimes() {
131-
if err := UpdateCreationTimes(); err != nil {
131+
if err := updateCreationTimes(); err != nil {
132132
log.Fatalf("Error: %v", err)
133133
}
134134
}

backfill/process/process_events.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import (
77
"go-backfill/repository"
88
)
99

10-
func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsId []int64) ([]repository.EventAttributes, error) {
10+
func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsId []int64, txCreationTimes []string) ([]repository.EventAttributes, error) {
1111
transactions := payload.Transactions
1212

1313
const avgEventsPerTransaction = 80
1414
events := make([]repository.EventAttributes, 0, len(transactions)*avgEventsPerTransaction)
1515

1616
for txIndex, t := range transactions {
17+
txCreationTime := txCreationTimes[txIndex]
1718
for eventIndex, event := range t.Events {
1819
module := buildModuleName(event.Module.Namespace, event.Module.Name)
1920
qualName := buildModuleName(event.Module.Namespace, event.Module.Name)
@@ -32,6 +33,7 @@ func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsI
3233
QualName: qualName,
3334
RequestKey: t.ReqKey,
3435
OrderIndex: eventIndex,
36+
CreationTime: txCreationTime,
3537
}
3638
events = append(events, eventRecord)
3739
}

backfill/process/save_payloads.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
124124
}
125125

126126
for index, processedPayload := range processedPayloads {
127-
events, err := PrepareEvents(network, processedPayload, transactionIdsToSave[index])
127+
events, err := PrepareEvents(network, processedPayload, transactionIdsToSave[index], txCreationTimesToSave[index])
128128
if err != nil {
129129
return Counters{}, DataSizeTracker{}, fmt.Errorf("preparing events -> %w", err)
130130
}

backfill/repository/event_repository.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type EventAttributes struct {
1818
QualName string `json:"qualName"`
1919
RequestKey string `json:"requestKey"`
2020
OrderIndex int `json:"orderIndex"`
21+
CreationTime string `json:"creationtime"`
2122
CreatedAt time.Time `json:"createdAt"`
2223
UpdatedAt time.Time `json:"updatedAt"`
2324
}
@@ -29,8 +30,8 @@ func SaveEventsToDatabase(events []EventAttributes, db pgx.Tx) error {
2930

3031
query := `
3132
INSERT INTO "Events"
32-
("transactionId", "chainId", "module", name, params, qualname, requestkey, "orderIndex", "createdAt", "updatedAt", canonical)
33-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
33+
("transactionId", "chainId", "module", name, params, qualname, requestkey, "creationtime", "orderIndex", "createdAt", "updatedAt", canonical)
34+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
3435
`
3536

3637
now := time.Now()
@@ -46,6 +47,7 @@ func SaveEventsToDatabase(events []EventAttributes, db pgx.Tx) error {
4647
event.Params,
4748
event.QualName,
4849
event.RequestKey,
50+
event.CreationTime,
4951
event.OrderIndex,
5052
now,
5153
now,
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
'use strict';
2+
3+
/** @type {import('sequelize-cli').Migration} */
4+
module.exports = {
5+
async up(queryInterface, Sequelize) {
6+
await queryInterface.addColumn('Events', 'creationtime', {
7+
type: Sequelize.STRING,
8+
comment: "The creation time of the transaction (e.g., '1715747797').",
9+
});
10+
},
11+
12+
async down(queryInterface) {
13+
await queryInterface.removeColumn('Events', 'creationtime');
14+
},
15+
};
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
'use strict';
2+
3+
/** @type {import('sequelize-cli').Migration} */
4+
module.exports = {
5+
async up(queryInterface) {
6+
await queryInterface.addIndex('Events', ['creationtime', 'id'], {
7+
name: 'events_creationtime_id_idx',
8+
});
9+
await queryInterface.addIndex('Events', ['module', 'id'], {
10+
name: 'events_module_id_idx',
11+
});
12+
await queryInterface.addIndex('Events', ['module', 'chain_id', 'id'], {
13+
name: 'events_module_chain_id_id_idx',
14+
});
15+
await queryInterface.addIndex('Events', ['module', 'name', 'creationtime', 'id'], {
16+
name: 'events_module_name_creationtime_id_idx',
17+
});
18+
},
19+
20+
async down(queryInterface) {
21+
await queryInterface.removeIndex('Events', 'events_creationtime_id_idx');
22+
await queryInterface.removeIndex('Events', 'events_module_id_idx');
23+
await queryInterface.removeIndex('Events', 'events_module_chain_id_id_idx');
24+
await queryInterface.removeIndex('Events', 'events_module_name_creationtime_id_idx');
25+
},
26+
};

indexer/src/kadena-server/config/graphql-types.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -868,9 +868,10 @@ export type QueryEventsArgs = {
868868
maxHeight?: InputMaybe<Scalars['Int']['input']>;
869869
minHeight?: InputMaybe<Scalars['Int']['input']>;
870870
minimumDepth?: InputMaybe<Scalars['Int']['input']>;
871+
moduleName?: InputMaybe<Scalars['String']['input']>;
871872
orderIndex?: InputMaybe<Scalars['Int']['input']>;
872873
parametersFilter?: InputMaybe<Scalars['String']['input']>;
873-
qualifiedEventName: Scalars['String']['input'];
874+
qualifiedEventName?: InputMaybe<Scalars['String']['input']>;
874875
requestKey?: InputMaybe<Scalars['String']['input']>;
875876
};
876877

@@ -3017,7 +3018,7 @@ export type QueryResolvers<
30173018
ResolversTypes['QueryEventsConnection'],
30183019
ParentType,
30193020
ContextType,
3020-
RequireFields<QueryEventsArgs, 'qualifiedEventName'>
3021+
Partial<QueryEventsArgs>
30213022
>;
30223023
fungibleAccount?: Resolver<
30233024
Maybe<ResolversTypes['FungibleAccount']>,

indexer/src/kadena-server/config/schema.graphql

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,17 +195,18 @@ type Query {
195195
events(
196196
after: String
197197
before: String
198-
blockHash: String
199-
chainId: String
200198
first: Int
201199
last: Int
200+
qualifiedEventName: String
201+
moduleName: String
202+
blockHash: String
203+
requestKey: String
204+
chainId: String
202205
maxHeight: Int
203206
minHeight: Int
204207
minimumDepth: Int
205208
orderIndex: Int
206209
parametersFilter: String
207-
qualifiedEventName: String!
208-
requestKey: String
209210
): QueryEventsConnection! @complexity(value: 1, multipliers: ["first", "last"])
210211

211212
"""

indexer/src/kadena-server/repository/application/event-repository.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ export type GetTransactionEventsParams = GetTotalTransactionEventsCount & Pagina
1111
export type GetEventsParams = GetTotalEventsCount & PaginationsParams;
1212

1313
export interface GetTotalEventsCount {
14-
qualifiedEventName: string;
14+
qualifiedEventName?: string | null;
15+
module?: string | null;
1516
blockHash?: string | null;
1617
chainId?: string | null;
1718
minHeight?: number | null;
@@ -52,7 +53,7 @@ export default interface EventRepository {
5253
pageInfo: PageInfo;
5354
edges: ConnectionEdge<EventOutput>[];
5455
}>;
55-
getEventsWithQualifiedName(params: GetEventsParams): Promise<{
56+
getEvents(params: GetEventsParams): Promise<{
5657
pageInfo: PageInfo;
5758
edges: ConnectionEdge<EventOutput>[];
5859
}>;

0 commit comments

Comments
 (0)