Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions backfill/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,58 @@ func getEnvAsInt(key string) int {
}
return value
}

func GetMinHeights(network string) map[int]int {
minHeights := make(map[int]int)
if network == "mainnet01" {
minHeights = map[int]int{
0: 0,
1: 0,
2: 0,
3: 0,
4: 0,
5: 0,
6: 0,
7: 0,
8: 0,
9: 0,
10: 852054,
11: 852054,
12: 852054,
13: 852054,
14: 852054,
15: 852054,
16: 852054,
17: 852054,
18: 852054,
19: 852054,
}
}

if network == "testnet04" {
minHeights = map[int]int{
0: 0,
1: 0,
2: 0,
3: 0,
4: 0,
5: 0,
6: 0,
7: 0,
8: 0,
9: 0,
10: 332604,
11: 332604,
12: 332604,
13: 332604,
14: 332604,
15: 332604,
16: 332604,
17: 332604,
18: 332604,
19: 332604,
}
}

return minHeights
}
3 changes: 2 additions & 1 deletion backfill/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func main() {
go config.StartMemoryMonitoring()
cut := fetch.FetchCut()
ChainId := env.ChainId
SyncMinHeight := env.SyncMinHeight
minHeights := config.GetMinHeights(env.Network)
SyncMinHeight := minHeights[ChainId]
process.StartBackfill(cut.Height, cut.Hash, ChainId, SyncMinHeight, pool)
}
10 changes: 0 additions & 10 deletions backfill/middle-backfill/middle-backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,5 @@ func FetchMaxHeightByChain(pool *pgxpool.Pool) ([]BlockHeight, error) {
return nil, fmt.Errorf("error occurred during row iteration: %w", err)
}

// const maxHeight = 5444446
// var blockHeights = []BlockHeight{}

// for i := 0; i < 20; i++ {
// blockHeights = append(blockHeights, BlockHeight{
// ChainID: int64(i),
// MaxHeight: maxHeight,
// })
// }

return results, nil
}
65 changes: 65 additions & 0 deletions backfill/process/coinbase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package process

import (
"encoding/json"
"fmt"
"go-backfill/fetch"
"go-backfill/repository"
"strconv"
)

type Coinbase struct {
ReqKey string `json:"reqKey"`
TxID int `json:"txId"`
Events []fetch.Event `json:"events"`
Result json.RawMessage `json:"result"`
Logs string `json:"logs"`
}

func decodeCoinbase(jsonStr string) (*Coinbase, error) {
var coinbase Coinbase
err := json.Unmarshal([]byte(jsonStr), &coinbase)
if err != nil {
return nil, err
}
return &coinbase, nil
}

func processCoinbaseTransaction(coinbase string, blockId int64, creationTime int64, chainId int64) (repository.TransactionAttributes, error) {

coinbaseDecoded, err := decodeCoinbase(coinbase)
if err != nil {
return repository.TransactionAttributes{}, fmt.Errorf("decoding Coinbase JSON of block %d: %w", blockId, err)
}

emptyJSON, _ := json.Marshal(map[string]interface{}{})
emptyArray, _ := json.Marshal([]interface{}{})

txAttribute := repository.TransactionAttributes{
BlockId: blockId,
Code: emptyJSON,
Data: emptyJSON,
ChainId: int(chainId),
CreationTime: strconv.FormatInt(creationTime, 10),
GasLimit: "0",
GasPrice: "0",
Hash: coinbaseDecoded.ReqKey,
Nonce: "",
PactId: nil,
Continuation: emptyJSON,
Gas: "0",
Result: coinbaseDecoded.Result,
Logs: coinbaseDecoded.Logs,
NumEvents: len(coinbaseDecoded.Events),
RequestKey: coinbaseDecoded.ReqKey,
Rollback: false,
Sender: "coinbase",
Sigs: emptyArray,
Step: 0,
Proof: nil,
TTL: "0",
TxId: fmt.Sprintf("%d", coinbaseDecoded.TxID),
}

return txAttribute, nil
}
2 changes: 1 addition & 1 deletion backfill/process/process_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func PrepareBlocks(network string, chainId int, payloads []fetch.ProcessedPayloa
TransactionsHash: payload.TransactionsHash,
OutputsHash: payload.OutputsHash,
Coinbase: string(payload.Coinbase),
TransactionsCount: len(payload.Transactions),
TransactionsCount: len(payload.Transactions) + 1, // txs + coinbase tx
}

blocks = append(blocks, block)
Expand Down
31 changes: 27 additions & 4 deletions backfill/process/process_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ import (
func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsId []int64) ([]repository.EventAttributes, error) {
transactions := payload.Transactions

if len(transactions) == 0 {
return []repository.EventAttributes{}, nil
}

const avgEventsPerTransaction = 80
events := make([]repository.EventAttributes, 0, len(transactions)*avgEventsPerTransaction)

Expand Down Expand Up @@ -42,7 +38,34 @@ func PrepareEvents(network string, payload fetch.ProcessedPayload, transactionsI
}
events = append(events, eventRecord)
}
}

coinbaseDecoded, err := decodeCoinbase(string(payload.Coinbase))
if err != nil {
return nil, fmt.Errorf("decoding Coinbase JSON of block: %w", err)
}

var coinbaseTxId = transactionsId[len(transactionsId)-1]
for eventIndex, event := range coinbaseDecoded.Events {

module := buildModuleName(event.Module.Namespace, event.Module.Name)
qualName := buildModuleName(event.Module.Namespace, event.Module.Name)
paramsJSON, err := json.Marshal(event.Params)
if err != nil {
return []repository.EventAttributes{}, fmt.Errorf("marshaling params for event %s: %w", event.Name, err)
}

eventRecord := repository.EventAttributes{
TransactionId: coinbaseTxId,
ChainId: payload.Header.ChainId,
Module: module,
Name: event.Name,
Params: paramsJSON,
QualName: qualName,
RequestKey: coinbaseDecoded.ReqKey,
OrderIndex: eventIndex,
}
events = append(events, eventRecord)
}

return events, nil
Expand Down
25 changes: 17 additions & 8 deletions backfill/process/process_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ type CmdData struct {
} `json:"payload"`
}

func PrepareTransactions(network string, blockId int64, payload fetch.ProcessedPayload) ([]repository.TransactionAttributes, error) {
func PrepareTransactions(network string, blockId int64, payload fetch.ProcessedPayload, block repository.BlockAttributes) ([]repository.TransactionAttributes, error) {
transactions := payload.Transactions

if len(transactions) == 0 {
return []repository.TransactionAttributes{}, nil
}

transactionRecords := make([]repository.TransactionAttributes, 0, len(transactions))

var cmdData CmdData
Expand Down Expand Up @@ -91,9 +87,14 @@ func PrepareTransactions(network string, blockId int64, payload fetch.ProcessedP
}
}

chainId, err := strconv.Atoi(cmdData.Meta.ChainId)
if err != nil {
return nil, fmt.Errorf("converting ChainId for transaction %s: %w", t.Hash, err)
var chainId int
if cmdData.Meta.ChainId != "" {
chainId, err = strconv.Atoi(cmdData.Meta.ChainId)
if err != nil {
return nil, fmt.Errorf("converting ChainId for transaction %s: %w", t.Hash, err)
}
} else {
chainId = block.ChainId
}

txId := strconv.Itoa(t.TxId)
Expand Down Expand Up @@ -137,6 +138,14 @@ func PrepareTransactions(network string, blockId int64, payload fetch.ProcessedP
transactionRecords = append(transactionRecords, transactionRecord)
}

coinbaseTx, err := processCoinbaseTransaction(string(payload.Coinbase), blockId, block.CreationTime, int64(block.ChainId))

if err != nil {
return nil, fmt.Errorf("processing coinbase transaction %d: %w", blockId, err)
}

transactionRecords = append(transactionRecords, coinbaseTx)

return transactionRecords, nil
}

Expand Down
18 changes: 12 additions & 6 deletions backfill/process/process_transfers.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package process

import (
"fmt"
"go-backfill/fetch"
"go-backfill/repository"
)

func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactionsId []int64) []repository.TransferAttributes {
func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactionsId []int64) ([]repository.TransferAttributes, error) {
transactions := payload.Transactions

if len(transactions) == 0 {
return []repository.TransferAttributes{}
}

const avgTransfersPerTransaction = 80
transfers := make([]repository.TransferAttributes, 0, len(transactions)*avgTransfersPerTransaction)

Expand All @@ -23,5 +20,14 @@ func PrepareTransfers(network string, payload fetch.ProcessedPayload, transactio
transfers = append(transfers, nftTransfers...)
}

return transfers
coinbaseDecoded, err := decodeCoinbase(string(payload.Coinbase))
if err != nil {
return nil, fmt.Errorf("decoding Coinbase JSON of block: %w", err)
}

var coinbaseTxId = transactionsId[len(transactionsId)-1]
coinTransfers := GetCoinTransfers(coinbaseDecoded.Events, payload.Header.ChainId, coinbaseDecoded.ReqKey, coinbaseTxId)
transfers = append(transfers, coinTransfers...)

return transfers, nil
}
9 changes: 7 additions & 2 deletions backfill/process/save_payloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process

var transactionIdsToSave [][]int64
for index, processedPayload := range processedPayloads {
txs, err := PrepareTransactions(network, blockIds[index], processedPayload)
var blockId = blockIds[index]
var currBlock = blocks[index]
txs, err := PrepareTransactions(network, blockId, processedPayload, currBlock)
if err != nil {
return Counters{}, DataSizeTracker{}, fmt.Errorf("saving transactions -> %w", err)
}
Expand Down Expand Up @@ -98,7 +100,10 @@ func savePayloads(network string, chainId int, processedPayloads []fetch.Process
}

for index, processedPayload := range processedPayloads {
transfers := PrepareTransfers(network, processedPayload, transactionIdsToSave[index])
transfers, err := PrepareTransfers(network, processedPayload, transactionIdsToSave[index])
if err != nil {
return Counters{}, DataSizeTracker{}, fmt.Errorf("preparing transfers -> %w", err)
}
if err := repository.SaveTransfersToDatabase(transfers, tx); err != nil {
return Counters{}, DataSizeTracker{}, fmt.Errorf("saving transfers: %w", err)
}
Expand Down
17 changes: 16 additions & 1 deletion backfill/process/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,30 @@ func (g *GasLimit) UnmarshalJSON(data []byte) error {
return nil
}

// Attempt to unmarshal the data as a float
var floatValue float64
if err := json.Unmarshal(data, &floatValue); err == nil {
*g = GasLimit(fmt.Sprintf("%.0f", floatValue)) // Convert float to int-like string
return nil
}

// Attempt to unmarshal the data as a string
var stringValue string
if err := json.Unmarshal(data, &stringValue); err == nil {
*g = GasLimit(stringValue) // Assign the string value directly
return nil
}

var tempStruct struct {
Int int `json:"int"`
}
if err := json.Unmarshal(data, &tempStruct); err == nil {
*g = GasLimit(strconv.Itoa(tempStruct.Int)) // Convert int to string and assign
return nil
}

// If neither, return an error
return fmt.Errorf("data is neither int nor string: %s", string(data))
return fmt.Errorf("data is neither int nor string nor float not { int: <some_number> }: %s", string(data))
}

func convertToFloat64(event fetch.Event, index int) (float64, bool) {
Expand Down
Loading