Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions backfill/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM golang:1.23.3 AS builder
WORKDIR /app
COPY . .
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o main .

FROM scratch
WORKDIR /app
COPY ./global-bundle.pem ./global-bundle.pem
COPY --from=builder /app/main .
CMD ["./main"]
11 changes: 11 additions & 0 deletions backfill/Dockerfile.indexes
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM golang:1.23.3 AS builder
WORKDIR /app
COPY . .
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o recreate-indexes ./recreate-indexes/recreate-indexes.go

FROM scratch
WORKDIR /app
COPY ./global-bundle.pem ./global-bundle.pem
COPY --from=builder /app/recreate-indexes .
CMD ["./recreate-indexes"]
11 changes: 11 additions & 0 deletions backfill/Dockerfile.middle-backfill
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM golang:1.23.3 AS builder
WORKDIR /app
COPY . .
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o middle-backfill ./middle-backfill/middle-backfill.go

FROM scratch
WORKDIR /app
COPY ./global-bundle.pem ./global-bundle.pem
COPY --from=builder /app/middle-backfill .
CMD ["./middle-backfill"]
34 changes: 34 additions & 0 deletions backfill/config/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package config

import (
"context"
"fmt"
"log"
"time"

"github.com/jackc/pgx/v5/pgxpool"
)

func InitDatabase() *pgxpool.Pool {
env := GetConfig()
connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
env.DbHost, env.DbPort, env.DbUser, env.DbPassword, env.DbName)

config, err := pgxpool.ParseConfig(connStr)
if err != nil {
log.Fatalf("Unable to parse connection string: %v\n", err)
}

config.MaxConns = 2 // Maximum number of connections
config.MinConns = 1 // Minimum number of connections to keep alive
config.MaxConnLifetime = 1 * time.Hour // Close and refresh connections after 1 hour
config.HealthCheckPeriod = 1 * time.Minute // Check connection health every minute

// Create the pool
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil {
log.Fatalf("Unable to connect to database: %v\n", err)
}

return pool
}
77 changes: 77 additions & 0 deletions backfill/config/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package config

import (
"log"
"os"
"strconv"

"github.com/joho/godotenv"
)

type Config struct {
DbUser string
DbPassword string
DbName string
DbHost string
DbPort string
CertPath string
Network string
ChainId int
SyncBaseUrl string
SyncMinHeight int
SyncFetchIntervalInBlocks int
SyncAttemptsMaxRetry int
SyncAttemptsIntervalInMs int
IsDevelopment bool
}

var config *Config

func InitEnv(envFilePath string) {
IsDevelopment := true
if err := godotenv.Load(envFilePath); err != nil {
IsDevelopment = false
log.Printf("No .env file found at %s, falling back to system environment variables", envFilePath)
}

config = &Config{
DbUser: getEnv("DB_USER"),
DbPassword: getEnv("DB_PASSWORD"),
DbName: getEnv("DB_NAME"),
DbHost: getEnv("DB_HOST"),
DbPort: getEnv("DB_PORT"),
CertPath: getEnv("CERT_PATH"),
Network: getEnv("NETWORK"),
ChainId: getEnvAsInt("CHAIN_ID"),
SyncBaseUrl: getEnv("SYNC_BASE_URL"),
SyncMinHeight: getEnvAsInt("SYNC_MIN_HEIGHT"),
SyncFetchIntervalInBlocks: getEnvAsInt("SYNC_FETCH_INTERVAL_IN_BLOCKS"),
SyncAttemptsMaxRetry: getEnvAsInt("SYNC_ATTEMPTS_MAX_RETRY"),
SyncAttemptsIntervalInMs: getEnvAsInt("SYNC_ATTEMPTS_INTERVAL_IN_MS"),
IsDevelopment: IsDevelopment,
}
}

func GetConfig() *Config {
if config == nil {
log.Fatal("Config not initialized. Call InitEnv first.")
}
return config
}

func getEnv(key string) string {
value := os.Getenv(key)
if value == "" {
log.Fatalf("Environment variable %s is required but not set", key)
}
return value
}

func getEnvAsInt(key string) int {
valueStr := getEnv(key)
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Fatalf("Environment variable %s must be an integer, but got: %s", key, valueStr)
}
return value
}
25 changes: 25 additions & 0 deletions backfill/config/memory-monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package config

import (
"log"
"runtime"
"time"
)

func StartMemoryMonitoring() {
var memStats runtime.MemStats
for {
runtime.ReadMemStats(&memStats)

log.Printf(
"Alloc: %v KB, Sys: %v KB, HeapIdle: %v KB, HeapInuse: %v KB, NumGC: %v\n",
memStats.Alloc/1024, // Total allocated memory
memStats.Sys/1024, // Total system memory requested
memStats.HeapIdle/1024, // Idle heap memory
memStats.HeapInuse/1024, // In-use heap memory
memStats.NumGC, // Number of garbage collections
)

time.Sleep(10 * time.Second)
}
}
93 changes: 93 additions & 0 deletions backfill/fetch/fetch_cut.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package fetch

import (
"encoding/json"
"fmt"
"go-backfill/config"
"io"
"log"
"net/http"
"strconv"
)

type FetchCutResult struct {
Hashes map[string]struct {
Height int `json:"height"`
Hash string `json:"hash"`
} `json:"hashes"`
}

type CutResult struct {
Hash string
Height int
}

func FetchCut() CutResult {
env := config.GetConfig()

endpoint := fmt.Sprintf("%s/%s/cut", env.SyncBaseUrl, env.Network)

resp, err := http.Get(endpoint)
if err != nil {
log.Fatalf("error making GET request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.Fatalf("Unexpected status code: %d, body: %s", resp.StatusCode, string(body))
}

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatalf("error reading response body: %v", err)
}

var result FetchCutResult
err = json.Unmarshal(body, &result)
if err != nil {
log.Fatalf("Error parsing JSON response: %v", err)
}

ChainId := strconv.Itoa(env.ChainId)
lastHeight := result.Hashes[ChainId].Height
// if lastHeight == nil {
// return 0, fmt.Errorf("no height found: %w", err)
// }
res := CutResult{
Hash: result.Hashes[ChainId].Hash,
Height: lastHeight,
}

return res
}

func FetchCuts() FetchCutResult {
env := config.GetConfig()

endpoint := fmt.Sprintf("%s/%s/cut", env.SyncBaseUrl, env.Network)

resp, err := http.Get(endpoint)
if err != nil {
log.Fatalf("error making GET request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.Fatalf("Unexpected status code: %d, body: %s", resp.StatusCode, string(body))
}

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatalf("error reading response body: %v", err)
}

var result FetchCutResult
err = json.Unmarshal(body, &result)
if err != nil {
log.Fatalf("Error parsing JSON response: %v", err)
}

return result
}
126 changes: 126 additions & 0 deletions backfill/fetch/fetch_payloads_with_headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package fetch

import (
"bytes"
"encoding/json"
"fmt"
"go-backfill/config"
"io"
"log"
"net/http"
"time"
)

type BlockInfo struct {
Header Header `json:"header"`
Payload Payload `json:"payloadWithOutputs"`
}

type Adjacents map[string]string

type Header struct {
Nonce string `json:"nonce"`
CreationTime int64 `json:"creationTime"`
Parent string `json:"parent"`
Adjacents Adjacents `json:"adjacents"`
Target string `json:"target"`
PayloadHash string `json:"payloadHash"`
ChainId int `json:"chainId"`
Weight string `json:"weight"`
Height int `json:"height"`
ChainwebVersion string `json:"chainwebVersion"`
EpochStart int64 `json:"epochStart"`
FeatureFlags uint64 `json:"featureFlags"`
Hash string `json:"hash"`
}

type Payload struct {
Transactions [][2]string `json:"transactions"`
MinerData string `json:"minerData"`
TransactionsHash string `json:"transactionsHash"`
OutputsHash string `json:"outputsHash"`
PayloadHash string `json:"payloadHash"`
Coinbase string `json:"coinbase"`
}

func FetchPayloadsWithHeaders(network string, chainId int, Hash string, minHeight int, maxHeight int) ([]BlockInfo, error) {
type FetchResponse struct {
Items []BlockInfo `json:"items"`
}

startTime := time.Now()
env := config.GetConfig()
endpoint := fmt.Sprintf("%s/%s/chain/%d/block/branch?minheight=%d&maxheight=%d", env.SyncBaseUrl, network, chainId, minHeight, maxHeight)

param := map[string]interface{}{
"upper": []string{Hash},
}

paramJSON, err := json.Marshal(param)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload hashes to JSON: %v", err)
}

attempt := 1
for attempt <= env.SyncAttemptsMaxRetry {
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(paramJSON))
if err != nil {
return nil, fmt.Errorf("failed to create request: %v", err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Printf("Attempt %d: Error making POST request for payloads: %v\n", attempt, err)
if attempt == env.SyncAttemptsMaxRetry {
return nil, err
}

attempt++
time.Sleep(time.Duration(env.SyncAttemptsIntervalInMs) * time.Millisecond)
continue
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Printf("Attempt %d: Received non-OK HTTP status %d\n", attempt, resp.StatusCode)
if attempt == env.SyncAttemptsMaxRetry {
return nil, fmt.Errorf("received non-OK HTTP status: %d", resp.StatusCode)
}

attempt++
time.Sleep(time.Duration(env.SyncAttemptsIntervalInMs) * time.Millisecond)
continue
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %v", err)
}

var payload FetchResponse
err = json.Unmarshal(body, &payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON response: %v", err)
}

if len(payload.Items) == 0 {
log.Printf("Attempt %d: No payloads found, retrying...\n", attempt)
if attempt == env.SyncAttemptsMaxRetry {
return nil, fmt.Errorf("no payloads found after maximum attempts: %v", err)
}

attempt++
time.Sleep(time.Duration(env.SyncAttemptsIntervalInMs) * time.Millisecond)
continue
}

log.Printf("Fetched payloads in %fs\n", time.Since(startTime).Seconds())
return payload.Items, nil
}

return nil, fmt.Errorf("failed to fetch payloads after maximum retry attempts: %v", err)
}
Loading