Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions backfill/clickhouse/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package clickhouse

import (
ch "github.com/ClickHouse/clickhouse-go/v2"
"context"
)

func NewClient(cfg *CHConfig) (ch.Conn, error) {
options := &ch.Options{
Addr: []string{cfg.ClickHouseURL},
Auth: ch.Auth{
Database: cfg.ClickHouseDatabase,
Username: cfg.ClickHouseUser,
Password: cfg.ClickHousePassword,
},
Settings: ch.Settings{
"max_execution_time": 60,
},
}
conn, err := ch.Open(options)
if err != nil {
return nil, err
}
if err := conn.Ping(context.Background()); err != nil {
return nil, err
}
return conn, nil
}


90 changes: 90 additions & 0 deletions backfill/clickhouse/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package clickhouse

import (
"log"
"os"
"strconv"

"github.com/joho/godotenv"
)

type CHConfig struct {
ClickHouseURL string
ClickHouseUser string
ClickHousePassword string
ClickHouseDatabase string
Network string
ChainId int
SyncBaseUrl string
SyncMinHeight int
SyncFetchIntervalInBlocks int
SyncAttemptsMaxRetry int
SyncAttemptsIntervalInMs int
IsSingleChain bool
}

var chConfig *CHConfig

func LoadCHEnv(envFilePath string) {
_ = godotenv.Load(envFilePath)

chConfig = &CHConfig{
ClickHouseURL: getEnvSoft("CLICKHOUSE_URL"),
ClickHouseUser: getEnvSoft("CLICKHOUSE_USER"),
ClickHousePassword: getEnvSoft("CLICKHOUSE_PASSWORD"),
ClickHouseDatabase: getEnvSoft("CLICKHOUSE_DATABASE"),
Network: getEnvHard("NETWORK"),
ChainId: getEnvAsIntSoft("CHAIN_ID"),
SyncBaseUrl: getEnvHard("SYNC_BASE_URL"),
SyncMinHeight: getEnvAsIntSoft("SYNC_MIN_HEIGHT"),
SyncFetchIntervalInBlocks: getEnvAsIntSoft("SYNC_FETCH_INTERVAL_IN_BLOCKS"),
SyncAttemptsMaxRetry: getEnvAsIntSoft("SYNC_ATTEMPTS_MAX_RETRY"),
SyncAttemptsIntervalInMs: getEnvAsIntSoft("SYNC_ATTEMPTS_INTERVAL_IN_MS"),
IsSingleChain: getEnvAsBoolSoft("IS_SINGLE_CHAIN_RUN"),
}
}

func GetCHConfig() *CHConfig {
if chConfig == nil {
log.Fatal("CH config not initialized. Call LoadCHEnv first.")
}
return chConfig
}

func getEnvHard(key string) string {
v := os.Getenv(key)
if v == "" {
log.Fatalf("Environment variable %s is required but not set", key)
}
return v
}

func getEnvSoft(key string) string {
return os.Getenv(key)
}

func getEnvAsIntSoft(key string) int {
v := os.Getenv(key)
if v == "" {
return 0
}
i, err := strconv.Atoi(v)
if err != nil {
log.Fatalf("Environment variable %s must be an integer, but got: %s", key, v)
}
return i
}

func getEnvAsBoolSoft(key string) bool {
v := os.Getenv(key)
if v == "" {
return false
}
b, err := strconv.ParseBool(v)
if err != nil {
log.Fatalf("Environment variable %s must be a boolean, but got: %s", key, v)
}
return b
}


92 changes: 92 additions & 0 deletions backfill/clickhouse/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package clickhouse

import (
"context"
"encoding/json"
"fmt"
ch "github.com/ClickHouse/clickhouse-go/v2"
)

type TxCodeRow struct {
ID uint64 `json:"id"`
RequestKey string `json:"requestKey"`
ChainID uint16 `json:"chainId"`
CreationTime uint64 `json:"creationTime"`
Height uint64 `json:"height"`
Canonical uint8 `json:"canonical"`
Sender string `json:"sender"`
Gas string `json:"gas"`
GasLimit string `json:"gasLimit"`
GasPrice string `json:"gasPrice"`
Code string `json:"code"`
}

func EnsureDDL(ctx context.Context, conn ch.Conn) error {
ddl := `
CREATE TABLE IF NOT EXISTS transactions_code_v1
(
id UInt64,
requestKey String,
chainId UInt16,
creationTime UInt64,
height UInt64,
canonical UInt8,
sender LowCardinality(String),
gas String,
gasLimit String,
gasPrice String,
code String,
INDEX idx_code_ngram code TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(toDateTime(creationTime))
ORDER BY (creationTime, id)
SETTINGS index_granularity = 8192
`
return conn.Exec(ctx, ddl)
}

func BulkInsert(ctx context.Context, conn ch.Conn, rows []TxCodeRow) error {
if len(rows) == 0 {
return nil
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO transactions_code_v1")
if err != nil {
return err
}
for _, r := range rows {
if err := batch.Append(
r.ID,
r.RequestKey,
r.ChainID,
r.CreationTime,
r.Height,
r.Canonical,
r.Sender,
r.Gas,
r.GasLimit,
r.GasPrice,
r.Code,
); err != nil {
return err
}
}
return batch.Send()
}

func MarshalCode(input json.RawMessage) (string, error) {
if len(input) == 0 {
return "", nil
}
var any interface{}
if err := json.Unmarshal(input, &any); err != nil {
return "", fmt.Errorf("marshal code: %w", err)
}
buf, err := json.Marshal(any)
if err != nil {
return "", fmt.Errorf("marshal code: %w", err)
}
return string(buf), nil
}


92 changes: 92 additions & 0 deletions backfill/cmd/clickhouse-repair/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"context"
"flag"
"log"
chpkg "go-backfill/clickhouse"
"go-backfill/config"
"go-backfill/repository"
)

func main() {
envFile := flag.String("env", ".env", "Path to the .env file")
batch := flag.Int("batch", 5000, "Batch size")
flag.Parse()

config.InitEnv(*envFile)
chpkg.LoadCHEnv(*envFile)

env := config.GetConfig()
if chpkg.GetCHConfig().ClickHouseURL == "" {
log.Fatal("CLICKHOUSE_URL not set; aborting repair")
}

pool := config.InitDatabase()
defer pool.Close()

ch, err := chpkg.NewClient(chpkg.GetCHConfig())
if err != nil {
log.Fatalf("clickhouse connect: %v", err)
}
if err := chpkg.EnsureDDL(context.Background(), ch); err != nil {
log.Fatalf("clickhouse DDL: %v", err)
}

var lastTdID int64 = 0
for {
rows, err := repository.FetchUnindexedBatch(pool, lastTdID, *batch)
if err != nil {
log.Fatalf("fetch batch: %v", err)
}
if len(rows) == 0 {
log.Println("repair complete")
return
}

payload := make([]chpkg.TxCodeRow, 0, len(rows))
tdIDs := make([]int64, 0, len(rows))
for _, r := range rows {
code, _ := chpkg.MarshalCode(r.Code)
payload = append(payload, chpkg.TxCodeRow{
ID: uint64(r.TxID),
RequestKey: r.RequestKey,
ChainID: uint16(r.ChainID),
CreationTime: uint64(parseUint64(r.CreationTime)),
Height: uint64(r.Height),
Canonical: boolToUint8(r.Canonical),
Sender: r.Sender,
Gas: r.Gas,
GasLimit: r.GasLimit,
GasPrice: r.GasPrice,
Code: code,
})
tdIDs = append(tdIDs, r.TdID)
lastTdID = r.TdID
}

if err := chpkg.BulkInsert(context.Background(), ch, payload); err != nil {
log.Printf("[CH][INSERT][ERROR] %v", err)
continue
}
if err := repository.MarkIndexedByTdIDs(pool, tdIDs); err != nil {
log.Printf("[PG][FLAG][ERROR] %v", err)
}
log.Printf("indexed %d rows up to td.id=%d", len(rows), lastTdID)
if env.IsSingleChain {
break
}
}
}

func parseUint64(s string) uint64 {
var out uint64
for i := 0; i < len(s); i++ {
out = out*10 + uint64(s[i]-'0')
}
return out
}

func boolToUint8(b bool) uint8 { if b { return 1 }; return 0 }


33 changes: 25 additions & 8 deletions backfill/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,32 @@ module go-backfill
go 1.23.3

require (
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
github.com/jackc/pgx/v5 v5.7.1
github.com/joho/godotenv v1.5.1
github.com/lib/pq v1.10.9
golang.org/x/sync v0.16.0
)

require (
github.com/ClickHouse/ch-go v0.67.0 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.7.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/lib/pq v1.10.9 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/text v0.18.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.27.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading