Skip to content

Commit bdc53be

Browse files
authored
Merge pull request #223 from hack-a-chain-software/coinbase-migration
refat: coinbase - created TransactionDetails table to prevent storing empty data; adjusted queries to use TransactionDetails; changed and improved the coinbase scripts
2 parents 0973958 + bf0eebd commit bdc53be

30 files changed

+1478
-528
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.23.7 AS builder
2+
WORKDIR /app
3+
COPY . .
4+
RUN go mod download
5+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o create-coinbase ./create-coinbase/create-coinbase.go
6+
7+
FROM scratch
8+
WORKDIR /app
9+
COPY ./global-bundle.pem ./global-bundle.pem
10+
COPY --from=builder /app/create-coinbase .
11+
CMD ["./create-coinbase"]

backfill/Dockerfile.creationtime

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.23.7 AS builder
2+
WORKDIR /app
3+
COPY . .
4+
RUN go mod download
5+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o fix-creationtime ./fix-creationtime/fix-creationtime.go
6+
7+
FROM scratch
8+
WORKDIR /app
9+
COPY ./global-bundle.pem ./global-bundle.pem
10+
COPY --from=builder /app/fix-creationtime .
11+
CMD ["./fix-creationtime"]
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.23.7 AS builder
2+
WORKDIR /app
3+
COPY . .
4+
RUN go mod download
5+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o migrate-coinbase ./migrate-coinbase/migrate-coinbase.go
6+
7+
FROM scratch
8+
WORKDIR /app
9+
COPY ./global-bundle.pem ./global-bundle.pem
10+
COPY --from=builder /app/migrate-coinbase .
11+
CMD ["./migrate-coinbase"]
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"go-backfill/config"
8+
"go-backfill/fetch"
9+
"go-backfill/process"
10+
"go-backfill/repository"
11+
"log"
12+
"strconv"
13+
"time"
14+
15+
"github.com/jackc/pgx/v5"
16+
)
17+
18+
const (
19+
coinbaseBatchSize = 1000
20+
)
21+
22+
type CoinbaseData struct {
23+
ID int64 `json:"id"`
24+
Coinbase string `json:"coinbase"`
25+
ChainId int `json:"chainId"`
26+
CreationTime string `json:"creationTime"`
27+
}
28+
29+
func createBatchCoinbase(conn *pgx.Conn, lastId int64, network string) (bool, int64, error) {
30+
startTime := time.Now()
31+
32+
// Start transaction for writes
33+
tx, err := conn.Begin(context.Background())
34+
if err != nil {
35+
return false, lastId, fmt.Errorf("failed to begin transaction: %v", err)
36+
}
37+
defer tx.Rollback(context.Background())
38+
39+
// Fetch blocks with coinbase data using cursor pagination
40+
query := `
41+
SELECT id, coinbase, "chainId", "creationTime"
42+
FROM "Blocks"
43+
WHERE id > $1
44+
ORDER BY id ASC
45+
LIMIT $2
46+
`
47+
48+
rows, err := conn.Query(context.Background(), query, lastId, coinbaseBatchSize)
49+
if err != nil {
50+
return false, lastId, fmt.Errorf("failed to execute query: %v", err)
51+
}
52+
defer rows.Close()
53+
54+
var blocks []CoinbaseData
55+
for rows.Next() {
56+
var block CoinbaseData
57+
if err := rows.Scan(&block.ID, &block.Coinbase, &block.ChainId, &block.CreationTime); err != nil {
58+
return false, lastId, fmt.Errorf("failed to scan row: %v", err)
59+
}
60+
blocks = append(blocks, block)
61+
}
62+
63+
if len(blocks) == 0 {
64+
return false, lastId, nil
65+
}
66+
67+
// Process each block's coinbase transaction
68+
var transactions []repository.TransactionAttributes
69+
var transactionIds []int64
70+
for _, block := range blocks {
71+
creationTime, err := strconv.ParseInt(block.CreationTime, 10, 64)
72+
if err != nil {
73+
return false, lastId, fmt.Errorf("failed to parse creation time for block %d: %v", block.ID, err)
74+
}
75+
tx, err := process.ProcessCoinbaseTransaction(block.Coinbase, block.ID, creationTime, int64(block.ChainId))
76+
if err != nil {
77+
return false, lastId, fmt.Errorf("failed to process coinbase for block %d: %v", block.ID, err)
78+
}
79+
transactions = append(transactions, tx)
80+
}
81+
82+
// Save transactions to database
83+
if len(transactions) > 0 {
84+
ids, err := repository.SaveTransactions(tx, transactions, repository.TransactionAttributes{})
85+
if err != nil {
86+
return false, lastId, fmt.Errorf("failed to save transactions: %v", err)
87+
}
88+
transactionIds = ids
89+
90+
// Process and save events and transfers for each coinbase transaction
91+
for i, block := range blocks {
92+
transactionId := transactionIds[i]
93+
94+
// Create a ProcessedPayload structure for the coinbase events
95+
processedPayload := fetch.ProcessedPayload{
96+
Header: fetch.Header{
97+
ChainId: block.ChainId,
98+
},
99+
Coinbase: []byte(block.Coinbase),
100+
}
101+
102+
// Prepare and save events
103+
events, err := process.PrepareEvents(network, processedPayload, []int64{transactionId})
104+
if err != nil {
105+
return false, lastId, fmt.Errorf("failed to prepare events for block %d: %v", block.ID, err)
106+
}
107+
108+
if err := repository.SaveEventsToDatabase(events, tx); err != nil {
109+
return false, lastId, fmt.Errorf("failed to save events for block %d: %v", block.ID, err)
110+
}
111+
112+
// Prepare and save transfers
113+
transfers, err := process.PrepareTransfers(network, processedPayload, []int64{transactionId})
114+
if err != nil {
115+
return false, lastId, fmt.Errorf("failed to prepare transfers for block %d: %v", block.ID, err)
116+
}
117+
118+
if err := repository.SaveTransfersToDatabase(transfers, tx); err != nil {
119+
return false, lastId, fmt.Errorf("failed to save transfers for block %d: %v", block.ID, err)
120+
}
121+
}
122+
}
123+
124+
if err := tx.Commit(context.Background()); err != nil {
125+
return false, lastId, fmt.Errorf("failed to commit transaction: %v", err)
126+
}
127+
128+
elapsed := time.Since(startTime)
129+
log.Printf("Processed %d coinbase transactions, their events, and transfers. Batch time: %.2fs", len(transactions), elapsed.Seconds())
130+
131+
// Return the last processed ID as the new cursor
132+
lastProcessedId := blocks[len(blocks)-1].ID
133+
return len(blocks) == coinbaseBatchSize, lastProcessedId, nil
134+
}
135+
136+
func main() {
137+
envFile := flag.String("env", ".env", "Path to the .env file")
138+
flag.Parse()
139+
140+
config.InitEnv(*envFile)
141+
env := config.GetConfig()
142+
143+
// Database connection
144+
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
145+
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)
146+
147+
conn, err := pgx.Connect(context.Background(), connStr)
148+
if err != nil {
149+
log.Fatalf("Failed to connect to database: %v", err)
150+
}
151+
defer conn.Close(context.Background())
152+
153+
log.Println("Connected to database")
154+
155+
lastId := int64(0)
156+
hasMore := true
157+
totalBlocks := int64(104813544)
158+
processedBlocks := int64(0)
159+
160+
for hasMore {
161+
var err error
162+
hasMore, lastId, err = createBatchCoinbase(conn, lastId, env.Network)
163+
if err != nil {
164+
log.Fatalf("Error during batch processing: %v", err)
165+
}
166+
processedBlocks += coinbaseBatchSize
167+
progress := float64(processedBlocks) / float64(totalBlocks) * 100
168+
169+
if hasMore {
170+
log.Printf("Progress: %.2f%% (%d/%d blocks processed)", progress, processedBlocks, totalBlocks)
171+
} else {
172+
log.Printf("Progress: 100.00%%")
173+
}
174+
175+
time.Sleep(100 * time.Millisecond)
176+
}
177+
178+
log.Println("Coinbase creation completed successfully")
179+
}

backfill/fetch/process_payloads.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ type Event struct {
1717
}
1818

1919
type Module struct {
20-
Namespace string `json:"namespace"`
21-
Name string `json:"name"`
20+
Namespace *string `json:"namespace"`
21+
Name string `json:"name"`
2222
}
2323

2424
type Result struct {
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"go-backfill/config"
8+
"log"
9+
"time"
10+
11+
"github.com/jackc/pgx/v5"
12+
)
13+
14+
const (
15+
batchSize = 1000
16+
lastId = 105126770
17+
)
18+
19+
func fixBatchCreationTime(conn *pgx.Conn, lastId int64) (bool, int64, error) {
20+
startTime := time.Now()
21+
22+
// Start transaction for writes
23+
tx, err := conn.Begin(context.Background())
24+
if err != nil {
25+
return false, lastId, fmt.Errorf("failed to begin transaction: %v", err)
26+
}
27+
defer tx.Rollback(context.Background())
28+
29+
// Update creation time for coinbase transactions in batch
30+
updateQuery := `
31+
WITH OrderedUpdates AS (
32+
SELECT id
33+
FROM "Transactions"
34+
WHERE id > $1 AND sender = 'coinbase'
35+
ORDER BY id ASC
36+
LIMIT $2
37+
)
38+
UPDATE "Transactions"
39+
SET creationtime = (CAST(creationtime AS BIGINT) / 1000000)::TEXT
40+
FROM OrderedUpdates
41+
WHERE "Transactions".id = OrderedUpdates.id
42+
RETURNING "Transactions".id, "Transactions".creationtime
43+
`
44+
45+
rows, err := tx.Query(context.Background(), updateQuery, lastId, batchSize)
46+
if err != nil {
47+
return false, lastId, fmt.Errorf("failed to execute update: %v", err)
48+
}
49+
defer rows.Close()
50+
51+
var updatedCount int
52+
var lastProcessedId int64
53+
var lastCreationTime string
54+
for rows.Next() {
55+
if err := rows.Scan(&lastProcessedId, &lastCreationTime); err != nil {
56+
return false, lastId, fmt.Errorf("failed to scan row: %v", err)
57+
}
58+
updatedCount++
59+
60+
}
61+
62+
if err := tx.Commit(context.Background()); err != nil {
63+
return false, lastId, fmt.Errorf("failed to commit transaction: %v", err)
64+
}
65+
66+
elapsed := time.Since(startTime)
67+
log.Printf("Fixed creation time for %d coinbase transactions. Batch time: %.2fs", updatedCount, elapsed.Seconds())
68+
69+
return updatedCount == batchSize, lastProcessedId, nil
70+
}
71+
72+
func main() {
73+
envFile := flag.String("env", ".env", "Path to the .env file")
74+
flag.Parse()
75+
76+
config.InitEnv(*envFile)
77+
env := config.GetConfig()
78+
79+
// Database connection
80+
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
81+
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)
82+
83+
conn, err := pgx.Connect(context.Background(), connStr)
84+
if err != nil {
85+
log.Fatalf("Failed to connect to database: %v", err)
86+
}
87+
defer conn.Close(context.Background())
88+
89+
log.Println("Connected to database")
90+
91+
lastId := int64(0)
92+
hasMore := true
93+
processedTransactions := int64(0)
94+
95+
for hasMore {
96+
var err error
97+
hasMore, lastId, err = fixBatchCreationTime(conn, lastId)
98+
if err != nil {
99+
log.Fatalf("Error during batch processing: %v", err)
100+
}
101+
processedTransactions += batchSize
102+
progress := float64(processedTransactions) / float64(105126770) * 100
103+
104+
if hasMore {
105+
log.Printf("Progress: %.2f%% (%d/%d transactions processed)", progress, processedTransactions, lastId)
106+
} else {
107+
log.Printf("Progress: 100.00%%")
108+
}
109+
110+
time.Sleep(100 * time.Millisecond)
111+
}
112+
113+
log.Println("Creation time fix completed successfully")
114+
}

backfill/middle-backfill/middle-backfill.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ func main() {
3737
ChainIdStr := strconv.FormatInt(maxHeight.ChainID, 10)
3838
Height := cuts.Hashes[ChainIdStr].Height
3939
Hash := cuts.Hashes[ChainIdStr].Hash
40-
// fmt.Println("Hash: ", Hash, " ChainId: ", ChainIdStr, " Height: ", Height, " MaxHeight: ", maxHeight.MaxHeight)
4140
process.StartBackfill(Height, Hash, int(maxHeight.ChainID), int(maxHeight.MaxHeight+1), pool)
4241
}
4342
}

0 commit comments

Comments
 (0)