Skip to content

Commit 15e4e3d

Browse files
committed
refat: coinbase - created TransactionDetails table to prevent storing empty data; adjusted queries to use TransactionDetails; changed and improved the coinbase scripts
1 parent f25d66d commit 15e4e3d

28 files changed

+1350
-525
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.23.7 AS builder
2+
WORKDIR /app
3+
COPY . .
4+
RUN go mod download
5+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o create-coinbase ./create-coinbase/create-coinbase.go
6+
7+
FROM scratch
8+
WORKDIR /app
9+
COPY ./global-bundle.pem ./global-bundle.pem
10+
COPY --from=builder /app/create-coinbase .
11+
CMD ["./create-coinbase"]
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.23.7 AS builder
2+
WORKDIR /app
3+
COPY . .
4+
RUN go mod download
5+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o migrate-coinbase ./migrate-coinbase/migrate-coinbase.go
6+
7+
FROM scratch
8+
WORKDIR /app
9+
COPY ./global-bundle.pem ./global-bundle.pem
10+
COPY --from=builder /app/migrate-coinbase .
11+
CMD ["./migrate-coinbase"]
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"go-backfill/config"
8+
"go-backfill/fetch"
9+
"go-backfill/process"
10+
"go-backfill/repository"
11+
"log"
12+
"strconv"
13+
"time"
14+
15+
"github.com/jackc/pgx/v5"
16+
)
17+
18+
const (
19+
coinbaseBatchSize = 1000
20+
)
21+
22+
type CoinbaseData struct {
23+
ID int64 `json:"id"`
24+
Coinbase string `json:"coinbase"`
25+
ChainId int `json:"chainId"`
26+
CreationTime string `json:"creationTime"`
27+
}
28+
29+
func createBatchCoinbase(conn *pgx.Conn, lastId int64, network string) (bool, int64, error) {
30+
startTime := time.Now()
31+
32+
// Start transaction for writes
33+
tx, err := conn.Begin(context.Background())
34+
if err != nil {
35+
return false, lastId, fmt.Errorf("failed to begin transaction: %v", err)
36+
}
37+
defer tx.Rollback(context.Background())
38+
39+
// Fetch blocks with coinbase data using cursor pagination
40+
query := `
41+
SELECT id, coinbase, "chainId", "creationTime"
42+
FROM "Blocks"
43+
WHERE id > $1
44+
ORDER BY id ASC
45+
LIMIT $2
46+
`
47+
48+
rows, err := conn.Query(context.Background(), query, lastId, coinbaseBatchSize)
49+
if err != nil {
50+
return false, lastId, fmt.Errorf("failed to execute query: %v", err)
51+
}
52+
defer rows.Close()
53+
54+
var blocks []CoinbaseData
55+
for rows.Next() {
56+
var block CoinbaseData
57+
if err := rows.Scan(&block.ID, &block.Coinbase, &block.ChainId, &block.CreationTime); err != nil {
58+
return false, lastId, fmt.Errorf("failed to scan row: %v", err)
59+
}
60+
blocks = append(blocks, block)
61+
}
62+
63+
if len(blocks) == 0 {
64+
return false, lastId, nil
65+
}
66+
67+
// Process each block's coinbase transaction
68+
var transactions []repository.TransactionAttributes
69+
var transactionIds []int64
70+
for _, block := range blocks {
71+
creationTime, err := strconv.ParseInt(block.CreationTime, 10, 64)
72+
if err != nil {
73+
return false, lastId, fmt.Errorf("failed to parse creation time for block %d: %v", block.ID, err)
74+
}
75+
tx, err := process.ProcessCoinbaseTransaction(block.Coinbase, block.ID, creationTime, int64(block.ChainId))
76+
if err != nil {
77+
return false, lastId, fmt.Errorf("failed to process coinbase for block %d: %v", block.ID, err)
78+
}
79+
transactions = append(transactions, tx)
80+
}
81+
82+
// Save transactions to database
83+
if len(transactions) > 0 {
84+
ids, err := repository.SaveTransactions(tx, transactions, repository.TransactionAttributes{})
85+
if err != nil {
86+
return false, lastId, fmt.Errorf("failed to save transactions: %v", err)
87+
}
88+
transactionIds = ids
89+
90+
// Process and save events and transfers for each coinbase transaction
91+
for i, block := range blocks {
92+
transactionId := transactionIds[i]
93+
94+
// Create a ProcessedPayload structure for the coinbase events
95+
processedPayload := fetch.ProcessedPayload{
96+
Header: fetch.Header{
97+
ChainId: block.ChainId,
98+
},
99+
Coinbase: []byte(block.Coinbase),
100+
}
101+
102+
// Prepare and save events
103+
events, err := process.PrepareEvents(network, processedPayload, []int64{transactionId})
104+
if err != nil {
105+
return false, lastId, fmt.Errorf("failed to prepare events for block %d: %v", block.ID, err)
106+
}
107+
108+
if err := repository.SaveEventsToDatabase(events, tx); err != nil {
109+
return false, lastId, fmt.Errorf("failed to save events for block %d: %v", block.ID, err)
110+
}
111+
112+
// Prepare and save transfers
113+
transfers, err := process.PrepareTransfers(network, processedPayload, []int64{transactionId})
114+
if err != nil {
115+
return false, lastId, fmt.Errorf("failed to prepare transfers for block %d: %v", block.ID, err)
116+
}
117+
118+
if err := repository.SaveTransfersToDatabase(transfers, tx); err != nil {
119+
return false, lastId, fmt.Errorf("failed to save transfers for block %d: %v", block.ID, err)
120+
}
121+
}
122+
}
123+
124+
if err := tx.Commit(context.Background()); err != nil {
125+
return false, lastId, fmt.Errorf("failed to commit transaction: %v", err)
126+
}
127+
128+
elapsed := time.Since(startTime)
129+
log.Printf("Processed %d coinbase transactions, their events, and transfers. Batch time: %.2fs", len(transactions), elapsed.Seconds())
130+
131+
// Return the last processed ID as the new cursor
132+
lastProcessedId := blocks[len(blocks)-1].ID
133+
return len(blocks) == coinbaseBatchSize, lastProcessedId, nil
134+
}
135+
136+
func main() {
137+
envFile := flag.String("env", ".env", "Path to the .env file")
138+
flag.Parse()
139+
140+
config.InitEnv(*envFile)
141+
env := config.GetConfig()
142+
143+
// Database connection
144+
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
145+
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)
146+
147+
conn, err := pgx.Connect(context.Background(), connStr)
148+
if err != nil {
149+
log.Fatalf("Failed to connect to database: %v", err)
150+
}
151+
defer conn.Close(context.Background())
152+
153+
log.Println("Connected to database")
154+
155+
lastId := int64(0)
156+
hasMore := true
157+
totalBlocks := int64(104813544)
158+
processedBlocks := int64(0)
159+
160+
for hasMore {
161+
var err error
162+
hasMore, lastId, err = createBatchCoinbase(conn, lastId, env.Network)
163+
if err != nil {
164+
log.Fatalf("Error during batch processing: %v", err)
165+
}
166+
processedBlocks += coinbaseBatchSize
167+
progress := float64(processedBlocks) / float64(totalBlocks) * 100
168+
169+
if hasMore {
170+
log.Printf("Progress: %.2f%% (%d/%d blocks processed)", progress, processedBlocks, totalBlocks)
171+
} else {
172+
log.Printf("Progress: 100.00%%")
173+
}
174+
175+
time.Sleep(100 * time.Millisecond)
176+
}
177+
178+
log.Println("Coinbase creation completed successfully")
179+
}

backfill/fetch/process_payloads.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ type Event struct {
1717
}
1818

1919
type Module struct {
20-
Namespace string `json:"namespace"`
21-
Name string `json:"name"`
20+
Namespace *string `json:"namespace"`
21+
Name string `json:"name"`
2222
}
2323

2424
type Result struct {

backfill/middle-backfill/middle-backfill.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ func main() {
3737
ChainIdStr := strconv.FormatInt(maxHeight.ChainID, 10)
3838
Height := cuts.Hashes[ChainIdStr].Height
3939
Hash := cuts.Hashes[ChainIdStr].Hash
40-
// fmt.Println("Hash: ", Hash, " ChainId: ", ChainIdStr, " Height: ", Height, " MaxHeight: ", maxHeight.MaxHeight)
4140
process.StartBackfill(Height, Hash, int(maxHeight.ChainID), int(maxHeight.MaxHeight+1), pool)
4241
}
4342
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package main
2+
3+
import (
4+
"database/sql"
5+
"flag"
6+
"fmt"
7+
"go-backfill/config"
8+
"log"
9+
"time"
10+
11+
_ "github.com/lib/pq" // PostgreSQL driver
12+
)
13+
14+
const (
15+
batchSize = 1000
16+
stopID = 173921979
17+
)
18+
19+
func migrateBatchTransactions(db *sql.DB, lastID int) (int, bool, error) {
20+
startTime := time.Now()
21+
22+
tx, err := db.Begin()
23+
if err != nil {
24+
return 0, false, fmt.Errorf("failed to begin transaction: %v", err)
25+
}
26+
defer tx.Rollback()
27+
28+
// Insert batch into TransactionDetails
29+
insertQuery := `
30+
INSERT INTO "TransactionDetails" (
31+
"transactionId", code, continuation, data, gas, gaslimit, gasprice,
32+
nonce, pactid, proof, rollback, sigs, step, ttl, "createdAt", "updatedAt"
33+
)
34+
SELECT
35+
id, code, continuation, data, gas, gaslimit, gasprice,
36+
nonce, pactid, proof, rollback, sigs, step, ttl, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
37+
FROM "Transactions"
38+
WHERE id > $1
39+
ORDER BY id ASC
40+
LIMIT $2
41+
RETURNING "transactionId";
42+
`
43+
44+
rows, err := tx.Query(insertQuery, lastID, batchSize)
45+
if err != nil {
46+
return 0, false, fmt.Errorf("failed to execute query: %v", err)
47+
}
48+
defer rows.Close()
49+
50+
var newLastID int
51+
var insertedCount int
52+
53+
// Scan through all rows and keep the last ID
54+
for rows.Next() {
55+
if err := rows.Scan(&newLastID); err != nil {
56+
return 0, false, fmt.Errorf("failed to scan row: %v", err)
57+
}
58+
insertedCount++
59+
}
60+
61+
if err := tx.Commit(); err != nil {
62+
return 0, false, fmt.Errorf("failed to commit transaction: %v", err)
63+
}
64+
65+
elapsed := time.Since(startTime)
66+
log.Printf("Inserted %d non-coinbase transactions. Last ID: %d. Batch time: %.2fs",
67+
insertedCount, newLastID, elapsed.Seconds())
68+
69+
hasNext := insertedCount == batchSize
70+
71+
return newLastID, hasNext, nil
72+
}
73+
74+
func migrateTransactionDbInformation(envFile string) error {
75+
config.InitEnv(envFile)
76+
env := config.GetConfig()
77+
78+
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
79+
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)
80+
81+
db, err := sql.Open("postgres", connStr)
82+
if err != nil {
83+
return fmt.Errorf("failed to connect to database: %v", err)
84+
}
85+
defer db.Close()
86+
87+
if err := db.Ping(); err != nil {
88+
return fmt.Errorf("failed to ping database: %v", err)
89+
}
90+
91+
log.Println("Connected to database")
92+
93+
lastIdInserted := 0
94+
95+
hasNext := true
96+
for hasNext {
97+
newLastId, newHasNext, err := migrateBatchTransactions(db, lastIdInserted)
98+
if err != nil {
99+
return fmt.Errorf("error during batch insert: %v", err)
100+
}
101+
102+
lastIdInserted = newLastId
103+
hasNext = newHasNext
104+
time.Sleep(100 * time.Millisecond)
105+
if hasNext {
106+
log.Printf("Progress: %.2f%%", float64(lastIdInserted)/float64(stopID)*100)
107+
} else {
108+
log.Printf("Progress: 100.00%%")
109+
}
110+
}
111+
112+
log.Println("Migration completed successfully")
113+
return nil
114+
}
115+
116+
func main() {
117+
envFile := flag.String("env", ".env", "Path to the .env file")
118+
flag.Parse()
119+
120+
// Run the migration
121+
if err := migrateTransactionDbInformation(*envFile); err != nil {
122+
log.Fatalf("Migration failed: %v", err)
123+
}
124+
}

0 commit comments

Comments
 (0)