Skip to content

Commit 6b10e2b

Browse files
committed
feat: clickhouse for elastic search on jsonb fields starting code from tx
1 parent ecd4864 commit 6b10e2b

32 files changed

+1195
-47
lines changed

backfill/clickhouse/client.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package clickhouse
2+
3+
import (
4+
ch "github.com/ClickHouse/clickhouse-go/v2"
5+
"context"
6+
)
7+
8+
func NewClient(cfg *CHConfig) (ch.Conn, error) {
9+
options := &ch.Options{
10+
Addr: []string{cfg.ClickHouseURL},
11+
Auth: ch.Auth{
12+
Database: cfg.ClickHouseDatabase,
13+
Username: cfg.ClickHouseUser,
14+
Password: cfg.ClickHousePassword,
15+
},
16+
Settings: ch.Settings{
17+
"max_execution_time": 60,
18+
},
19+
}
20+
conn, err := ch.Open(options)
21+
if err != nil {
22+
return nil, err
23+
}
24+
if err := conn.Ping(context.Background()); err != nil {
25+
return nil, err
26+
}
27+
return conn, nil
28+
}
29+
30+

backfill/clickhouse/env.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package clickhouse
2+
3+
import (
4+
"log"
5+
"os"
6+
"strconv"
7+
8+
"github.com/joho/godotenv"
9+
)
10+
11+
type CHConfig struct {
12+
ClickHouseURL string
13+
ClickHouseUser string
14+
ClickHousePassword string
15+
ClickHouseDatabase string
16+
Network string
17+
ChainId int
18+
SyncBaseUrl string
19+
SyncMinHeight int
20+
SyncFetchIntervalInBlocks int
21+
SyncAttemptsMaxRetry int
22+
SyncAttemptsIntervalInMs int
23+
IsSingleChain bool
24+
}
25+
26+
var chConfig *CHConfig
27+
28+
func LoadCHEnv(envFilePath string) {
29+
_ = godotenv.Load(envFilePath)
30+
31+
chConfig = &CHConfig{
32+
ClickHouseURL: getEnvSoft("CLICKHOUSE_URL"),
33+
ClickHouseUser: getEnvSoft("CLICKHOUSE_USER"),
34+
ClickHousePassword: getEnvSoft("CLICKHOUSE_PASSWORD"),
35+
ClickHouseDatabase: getEnvSoft("CLICKHOUSE_DATABASE"),
36+
Network: getEnvHard("SYNC_NETWORK"),
37+
ChainId: getEnvAsIntSoft("CHAIN_ID"),
38+
SyncBaseUrl: getEnvHard("SYNC_BASE_URL"),
39+
SyncMinHeight: getEnvAsIntSoft("SYNC_MIN_HEIGHT"),
40+
SyncFetchIntervalInBlocks: getEnvAsIntSoft("SYNC_FETCH_INTERVAL_IN_BLOCKS"),
41+
SyncAttemptsMaxRetry: getEnvAsIntSoft("SYNC_ATTEMPTS_MAX_RETRY"),
42+
SyncAttemptsIntervalInMs: getEnvAsIntSoft("SYNC_ATTEMPTS_INTERVAL_IN_MS"),
43+
IsSingleChain: getEnvAsBoolSoft("IS_SINGLE_CHAIN_RUN"),
44+
}
45+
}
46+
47+
func GetCHConfig() *CHConfig {
48+
if chConfig == nil {
49+
log.Fatal("CH config not initialized. Call LoadCHEnv first.")
50+
}
51+
return chConfig
52+
}
53+
54+
func getEnvHard(key string) string {
55+
v := os.Getenv(key)
56+
if v == "" {
57+
log.Fatalf("Environment variable %s is required but not set", key)
58+
}
59+
return v
60+
}
61+
62+
func getEnvSoft(key string) string {
63+
return os.Getenv(key)
64+
}
65+
66+
func getEnvAsIntSoft(key string) int {
67+
v := os.Getenv(key)
68+
if v == "" {
69+
return 0
70+
}
71+
i, err := strconv.Atoi(v)
72+
if err != nil {
73+
log.Fatalf("Environment variable %s must be an integer, but got: %s", key, v)
74+
}
75+
return i
76+
}
77+
78+
func getEnvAsBoolSoft(key string) bool {
79+
v := os.Getenv(key)
80+
if v == "" {
81+
return false
82+
}
83+
b, err := strconv.ParseBool(v)
84+
if err != nil {
85+
log.Fatalf("Environment variable %s must be a boolean, but got: %s", key, v)
86+
}
87+
return b
88+
}
89+
90+

backfill/clickhouse/indexer.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package clickhouse
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
ch "github.com/ClickHouse/clickhouse-go/v2"
8+
)
9+
10+
type TxCodeRow struct {
11+
ID uint64 `json:"id"`
12+
RequestKey string `json:"requestKey"`
13+
ChainID uint16 `json:"chainId"`
14+
CreationTime uint64 `json:"creationTime"`
15+
Height uint64 `json:"height"`
16+
Canonical uint8 `json:"canonical"`
17+
Sender string `json:"sender"`
18+
Gas string `json:"gas"`
19+
GasLimit string `json:"gasLimit"`
20+
GasPrice string `json:"gasPrice"`
21+
Code string `json:"code"`
22+
}
23+
24+
func EnsureDDL(ctx context.Context, conn ch.Conn) error {
25+
ddl := `
26+
CREATE TABLE IF NOT EXISTS transactions_code_v1
27+
(
28+
id UInt64,
29+
requestKey String,
30+
chainId UInt16,
31+
creationTime UInt64,
32+
height UInt64,
33+
canonical UInt8,
34+
sender LowCardinality(String),
35+
gas String,
36+
gasLimit String,
37+
gasPrice String,
38+
code String,
39+
INDEX idx_code_ngram code TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 4
40+
)
41+
ENGINE = MergeTree()
42+
PARTITION BY toYYYYMM(toDateTime(creationTime))
43+
ORDER BY (creationTime, id)
44+
SETTINGS index_granularity = 8192
45+
`
46+
return conn.Exec(ctx, ddl)
47+
}
48+
49+
func BulkInsert(ctx context.Context, conn ch.Conn, rows []TxCodeRow) error {
50+
if len(rows) == 0 {
51+
return nil
52+
}
53+
batch, err := conn.PrepareBatch(ctx, "INSERT INTO transactions_code_v1")
54+
if err != nil {
55+
return err
56+
}
57+
for _, r := range rows {
58+
if err := batch.Append(
59+
r.ID,
60+
r.RequestKey,
61+
r.ChainID,
62+
r.CreationTime,
63+
r.Height,
64+
r.Canonical,
65+
r.Sender,
66+
r.Gas,
67+
r.GasLimit,
68+
r.GasPrice,
69+
r.Code,
70+
); err != nil {
71+
return err
72+
}
73+
}
74+
return batch.Send()
75+
}
76+
77+
func MarshalCode(input json.RawMessage) (string, error) {
78+
if len(input) == 0 {
79+
return "", nil
80+
}
81+
var any interface{}
82+
if err := json.Unmarshal(input, &any); err != nil {
83+
return "", fmt.Errorf("marshal code: %w", err)
84+
}
85+
buf, err := json.Marshal(any)
86+
if err != nil {
87+
return "", fmt.Errorf("marshal code: %w", err)
88+
}
89+
return string(buf), nil
90+
}
91+
92+
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"log"
7+
chpkg "go-backfill/clickhouse"
8+
"go-backfill/config"
9+
"go-backfill/repository"
10+
)
11+
12+
func main() {
13+
envFile := flag.String("env", ".env", "Path to the .env file")
14+
batch := flag.Int("batch", 5000, "Batch size")
15+
flag.Parse()
16+
17+
config.InitEnv(*envFile)
18+
chpkg.LoadCHEnv(*envFile)
19+
20+
env := config.GetConfig()
21+
if chpkg.GetCHConfig().ClickHouseURL == "" {
22+
log.Fatal("CLICKHOUSE_URL not set; aborting repair")
23+
}
24+
25+
pool := config.InitDatabase()
26+
defer pool.Close()
27+
28+
ch, err := chpkg.NewClient(chpkg.GetCHConfig())
29+
if err != nil {
30+
log.Fatalf("clickhouse connect: %v", err)
31+
}
32+
if err := chpkg.EnsureDDL(context.Background(), ch); err != nil {
33+
log.Fatalf("clickhouse DDL: %v", err)
34+
}
35+
36+
var lastTdID int64 = 0
37+
for {
38+
rows, err := repository.FetchUnindexedBatch(pool, lastTdID, *batch)
39+
if err != nil {
40+
log.Fatalf("fetch batch: %v", err)
41+
}
42+
if len(rows) == 0 {
43+
log.Println("repair complete")
44+
return
45+
}
46+
47+
payload := make([]chpkg.TxCodeRow, 0, len(rows))
48+
tdIDs := make([]int64, 0, len(rows))
49+
for _, r := range rows {
50+
code, _ := chpkg.MarshalCode(r.Code)
51+
payload = append(payload, chpkg.TxCodeRow{
52+
ID: uint64(r.TxID),
53+
RequestKey: r.RequestKey,
54+
ChainID: uint16(r.ChainID),
55+
CreationTime: uint64(parseUint64(r.CreationTime)),
56+
Height: uint64(r.Height),
57+
Canonical: boolToUint8(r.Canonical),
58+
Sender: r.Sender,
59+
Gas: r.Gas,
60+
GasLimit: r.GasLimit,
61+
GasPrice: r.GasPrice,
62+
Code: code,
63+
})
64+
tdIDs = append(tdIDs, r.TdID)
65+
lastTdID = r.TdID
66+
}
67+
68+
if err := chpkg.BulkInsert(context.Background(), ch, payload); err != nil {
69+
log.Printf("[CH][INSERT][ERROR] %v", err)
70+
continue
71+
}
72+
if err := repository.MarkIndexedByTdIDs(pool, tdIDs); err != nil {
73+
log.Printf("[PG][FLAG][ERROR] %v", err)
74+
}
75+
log.Printf("indexed %d rows up to td.id=%d", len(rows), lastTdID)
76+
if env.IsSingleChain {
77+
break
78+
}
79+
}
80+
}
81+
82+
func parseUint64(s string) uint64 {
83+
var out uint64
84+
for i := 0; i < len(s); i++ {
85+
out = out*10 + uint64(s[i]-'0')
86+
}
87+
return out
88+
}
89+
90+
func boolToUint8(b bool) uint8 { if b { return 1 }; return 0 }
91+
92+

backfill/go.mod

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,32 @@ module go-backfill
33
go 1.23.3
44

55
require (
6-
github.com/aws/aws-sdk-go v1.55.5 // indirect
6+
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
7+
github.com/jackc/pgx/v5 v5.7.1
8+
github.com/joho/godotenv v1.5.1
9+
github.com/lib/pq v1.10.9
10+
golang.org/x/sync v0.16.0
11+
)
12+
13+
require (
14+
github.com/ClickHouse/ch-go v0.67.0 // indirect
15+
github.com/andybalholm/brotli v1.2.0 // indirect
16+
github.com/go-faster/city v1.0.1 // indirect
17+
github.com/go-faster/errors v0.7.1 // indirect
18+
github.com/google/uuid v1.6.0 // indirect
719
github.com/jackc/pgpassfile v1.0.0 // indirect
820
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
9-
github.com/jackc/pgx/v5 v5.7.1 // indirect
1021
github.com/jackc/puddle/v2 v2.2.2 // indirect
11-
github.com/jmespath/go-jmespath v0.4.0 // indirect
12-
github.com/joho/godotenv v1.5.1 // indirect
13-
github.com/lib/pq v1.10.9 // indirect
14-
golang.org/x/crypto v0.27.0 // indirect
15-
golang.org/x/sync v0.8.0 // indirect
16-
golang.org/x/text v0.18.0 // indirect
22+
github.com/klauspost/compress v1.18.0 // indirect
23+
github.com/paulmach/orb v0.11.1 // indirect
24+
github.com/pierrec/lz4/v4 v4.1.22 // indirect
25+
github.com/rogpeppe/go-internal v1.14.1 // indirect
26+
github.com/segmentio/asm v1.2.0 // indirect
27+
github.com/shopspring/decimal v1.4.0 // indirect
28+
go.opentelemetry.io/otel v1.37.0 // indirect
29+
go.opentelemetry.io/otel/trace v1.37.0 // indirect
30+
golang.org/x/crypto v0.40.0 // indirect
31+
golang.org/x/sys v0.34.0 // indirect
32+
golang.org/x/text v0.27.0 // indirect
33+
gopkg.in/yaml.v3 v3.0.1 // indirect
1734
)

0 commit comments

Comments
 (0)