Skip to content

Commit 42361b6

Browse files
committed
BEDS-140: feat(exporter): clickhouse cl dashboard data exporter
1 parent bb80db9 commit 42361b6

File tree

59 files changed

+8431
-4749
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+8431
-4749
lines changed

backend/cmd/exporter/main.go

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,58 @@ func Run() {
6060
defer wg.Done()
6161
db.WriterDb, db.ReaderDb = db.MustInitDB(&cfg.WriterDatabase, &cfg.ReaderDatabase, "pgx", "postgres")
6262
}()
63+
wg.Add(1)
64+
go func() {
65+
defer wg.Done()
66+
db.AlloyWriter, db.AlloyReader = db.MustInitDB(&cfg.AlloyWriter, &cfg.AlloyReader, "pgx", "postgres")
67+
}()
68+
wg.Add(1)
69+
go func() {
70+
defer wg.Done()
71+
bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.ClConfig.DepositChainID), utils.Config.RedisCacheEndpoint)
72+
if err != nil {
73+
log.Fatal(err, "error connecting to bigtable", 0)
74+
}
75+
db.BigtableClient = bt
76+
}()
77+
if utils.Config.TieredCacheProvider != "redis" {
78+
log.Fatal(fmt.Errorf("no cache provider set, please set TierdCacheProvider (example redis)"), "", 0)
79+
}
80+
if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 {
81+
wg.Add(1)
82+
go func() {
83+
defer wg.Done()
84+
cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint)
85+
log.Infof("tiered Cache initialized, latest finalized epoch: %v", cache.LatestFinalizedEpoch.Get())
86+
}()
87+
}
88+
wg.Add(1)
89+
go func() {
90+
defer wg.Done()
91+
// Initialize the persistent redis client
92+
rdc := redis.NewClient(&redis.Options{
93+
Addr: utils.Config.RedisSessionStoreEndpoint,
94+
ReadTimeout: time.Second * 20,
95+
})
96+
97+
if err := rdc.Ping(context.Background()).Err(); err != nil {
98+
log.Fatal(err, "error connecting to persistent redis store", 0)
99+
}
100+
db.PersistentRedisDbClient = rdc
101+
}()
63102
} else {
64103
log.Warnf("------- EXPORTER RUNNING IN V2 ONLY MODE ------")
65104
}
66105

67106
wg.Add(1)
68107
go func() {
69108
defer wg.Done()
70-
db.AlloyWriter, db.AlloyReader = db.MustInitDB(&cfg.AlloyWriter, &cfg.AlloyReader, "pgx", "postgres")
109+
db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&cfg.ClickHouse.WriterDatabase, &cfg.ClickHouse.ReaderDatabase, "clickhouse", "clickhouse")
110+
}()
111+
wg.Add(1)
112+
go func() {
113+
defer wg.Done()
114+
db.ClickHouseNativeWriter = db.MustInitClickhouseNative(&cfg.ClickHouse.WriterDatabase)
71115
}()
72116

73117
wg.Add(1)
@@ -101,57 +145,24 @@ func Run() {
101145
}
102146
}()
103147

104-
wg.Add(1)
105-
go func() {
106-
defer wg.Done()
107-
bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.ClConfig.DepositChainID), utils.Config.RedisCacheEndpoint)
108-
if err != nil {
109-
log.Fatal(err, "error connecting to bigtable", 0)
110-
}
111-
db.BigtableClient = bt
112-
}()
113-
114-
if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 {
115-
wg.Add(1)
116-
go func() {
117-
defer wg.Done()
118-
cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint)
119-
log.Infof("tiered Cache initialized, latest finalized epoch: %v", cache.LatestFinalizedEpoch.Get())
120-
}()
121-
}
122-
123-
wg.Add(1)
124-
go func() {
125-
defer wg.Done()
126-
// Initialize the persistent redis client
127-
rdc := redis.NewClient(&redis.Options{
128-
Addr: utils.Config.RedisSessionStoreEndpoint,
129-
ReadTimeout: time.Second * 20,
130-
})
131-
132-
if err := rdc.Ping(context.Background()).Err(); err != nil {
133-
log.Fatal(err, "error connecting to persistent redis store", 0)
134-
}
135-
db.PersistentRedisDbClient = rdc
136-
}()
137-
138148
wg.Wait()
139149

140150
// enable light-weight db connection monitoring
141151
monitoring.Init(false)
142152
monitoring.Start()
143153

144-
if utils.Config.TieredCacheProvider != "redis" {
145-
log.Fatal(fmt.Errorf("no cache provider set, please set TierdCacheProvider (example redis)"), "", 0)
146-
}
147-
148154
if !cfg.JustV2 {
149155
defer db.ReaderDb.Close()
150156
defer db.WriterDb.Close()
157+
defer db.AlloyReader.Close()
158+
defer db.AlloyWriter.Close()
159+
defer db.BigtableClient.Close()
151160
}
152-
defer db.AlloyReader.Close()
153-
defer db.AlloyWriter.Close()
154-
defer db.BigtableClient.Close()
161+
defer db.ClickHouseReader.Close()
162+
defer db.ClickHouseWriter.Close()
163+
defer db.ClickHouseNativeWriter.Close()
164+
165+
wg.Wait()
155166

156167
context, err := modules.GetModuleContext()
157168
if err != nil {

backend/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ require (
6868
github.com/prysmaticlabs/go-ssz v0.0.0-20210121151755-f6208871c388
6969
github.com/rocket-pool/rocketpool-go v1.8.4-0.20241009143357-7b6894d57365
7070
github.com/rocket-pool/smartnode v1.14.1
71+
github.com/segmentio/encoding v0.4.0
7172
github.com/sethvargo/go-envconfig v1.1.0
7273
github.com/shopspring/decimal v1.4.0
7374
github.com/sirupsen/logrus v1.9.3

backend/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,8 @@ github.com/sanity-io/litter v1.5.5/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF
914914
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
915915
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
916916
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
917+
github.com/segmentio/encoding v0.4.0 h1:MEBYvRqiUB2nfR2criEXWqwdY6HJOUrCn5hboVOVmy8=
918+
github.com/segmentio/encoding v0.4.0/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI=
917919
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
918920
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
919921
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=

backend/pkg/commons/db/clickhouse.go

Lines changed: 70 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import (
55
"crypto/tls"
66
"fmt"
77
"net"
8-
"reflect"
9-
"sync"
8+
"runtime"
109
"time"
1110

1211
ch "github.com/ClickHouse/clickhouse-go/v2"
1312
"github.com/gobitfly/beaconchain/pkg/commons/log"
13+
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
1414
"github.com/gobitfly/beaconchain/pkg/commons/types"
15+
"github.com/gobitfly/beaconchain/pkg/commons/version"
16+
"golang.org/x/sync/errgroup"
1517
)
1618

1719
var ClickHouseNativeWriter ch.Conn
@@ -56,6 +58,18 @@ func MustInitClickhouseNative(writer *types.DatabaseConfig) ch.Conn {
5658
Debugf: func(s string, p ...interface{}) {
5759
log.Debugf("CH NATIVE WRITER: "+s, p...)
5860
},
61+
Settings: ch.Settings{
62+
"deduplicate_blocks_in_dependent_materialized_views": "1",
63+
"update_insert_deduplication_token_in_dependent_materialized_views": "1",
64+
},
65+
ClientInfo: ch.ClientInfo{
66+
Products: []struct {
67+
Name string
68+
Version string
69+
}{
70+
{Name: "beaconchain-explorer", Version: version.Version},
71+
},
72+
},
5973
})
6074
if err != nil {
6175
log.Fatal(err, "Error connecting to clickhouse native writer", 0)
@@ -74,24 +88,42 @@ func ClickHouseTestConnection(db ch.Conn, dataBaseName string) {
7488
log.Debugf("connected to clickhouse database %s with version %s", dataBaseName, v)
7589
}
7690

77-
func DumpToClickhouse(data interface{}, table string) error {
91+
type UltraFastClickhouseStruct interface {
92+
Get(string) any
93+
Extend(UltraFastClickhouseStruct) error
94+
}
95+
96+
func UltraFastDumpToClickhouse[T UltraFastClickhouseStruct](data T, target_table string, insert_uuid string) error {
7897
start := time.Now()
79-
columns, err := ConvertToColumnar(data)
98+
// add metrics
99+
defer func() {
100+
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("clickhouse_dump_%s_overall", target_table)).Observe(time.Since(start).Seconds())
101+
}()
102+
now := time.Now()
103+
// get column order & names from clickhouse
104+
var columns []string
105+
err := ClickHouseReader.Select(&columns, "SELECT name FROM system.columns where table=$1 and database=currentDatabase() order by position;", target_table)
80106
if err != nil {
81107
return err
82108
}
83-
log.Debugf("converted to columnar in %s", time.Since(start))
84-
start = time.Now()
85-
// abort after 3 minutes
86-
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
109+
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("clickhouse_dump_%s_get_columns", target_table)).Observe(time.Since(now).Seconds())
110+
now = time.Now()
111+
// prepare batch
112+
abortCtx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
87113
defer cancel()
88-
89-
batch, err := ClickHouseNativeWriter.PrepareBatch(ctx, `INSERT INTO `+table)
114+
ctx := ch.Context(abortCtx, ch.WithSettings(ch.Settings{
115+
"insert_deduplication_token": insert_uuid, // 重复数据插入时,会根据这个字段进行去重
116+
"insert_deduplicate": true,
117+
}), ch.WithLogs(func(l *ch.Log) {
118+
log.Debugf("CH NATIVE WRITER: %s", l.Text)
119+
}),
120+
)
121+
batch, err := ClickHouseNativeWriter.PrepareBatch(ctx, `INSERT INTO `+target_table)
90122
if err != nil {
91123
return err
92124
}
93-
log.Debugf("prepared batch in %s", time.Since(start))
94-
start = time.Now()
125+
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("clickhouse_dump_%s_prepare_batch", target_table)).Observe(time.Since(now).Seconds())
126+
now = time.Now()
95127
defer func() {
96128
if batch.IsSent() {
97129
return
@@ -101,97 +133,38 @@ func DumpToClickhouse(data interface{}, table string) error {
101133
log.Warnf("failed to abort batch: %v", err)
102134
}
103135
}()
104-
for c := 0; c < len(columns); c++ {
105-
// type assert to correct type
106-
log.Debugf("appending column %d", c)
107-
switch columns[c].(type) {
108-
case []int64:
109-
err = batch.Column(c).Append(columns[c].([]int64))
110-
case []uint64:
111-
err = batch.Column(c).Append(columns[c].([]uint64))
112-
case []time.Time:
113-
// appending unix timestamps as int64 to a DateTime column is actually faster than appending time.Time directly
114-
// tho with how many columns we have it doesn't really matter
115-
err = batch.Column(c).Append(columns[c].([]time.Time))
116-
case []float64:
117-
err = batch.Column(c).Append(columns[c].([]float64))
118-
case []bool:
119-
err = batch.Column(c).Append(columns[c].([]bool))
120-
default:
121-
// warning: slow path. works but try to avoid this
122-
cType := reflect.TypeOf(columns[c])
123-
log.Warnf("fallback: column %d of type %s is not natively supported, falling back to reflection", c, cType)
124-
startSlow := time.Now()
125-
cValue := reflect.ValueOf(columns[c])
126-
length := cValue.Len()
127-
cSlice := reflect.MakeSlice(reflect.SliceOf(cType.Elem()), length, length)
128-
for i := 0; i < length; i++ {
129-
cSlice.Index(i).Set(cValue.Index(i))
130-
}
131-
err = batch.Column(c).Append(cSlice.Interface())
132-
log.Debugf("fallback: appended column %d in %s", c, time.Since(startSlow))
136+
var g errgroup.Group
137+
g.SetLimit(runtime.NumCPU())
138+
// iterate columns retrieved from clickhouse
139+
for i, n := range columns {
140+
// Capture the loop variable
141+
col_index := i
142+
col_name := n
143+
if col_name == "_inserted_at" {
144+
continue
133145
}
134-
if err != nil {
146+
// Start a new goroutine for each column
147+
g.Go(func() error {
148+
// get it from the struct
149+
column := data.Get(col_name)
150+
if column == nil {
151+
return fmt.Errorf("column %s not found in struct", col_name)
152+
}
153+
// Perform the type assertion and append operation
154+
err = batch.Column(col_index).Append(column)
155+
log.Debugf("appended column %s in %s", col_name, time.Since(now))
135156
return err
136-
}
157+
})
137158
}
138-
log.Debugf("appended all columns to batch in %s", time.Since(start))
139-
start = time.Now()
159+
if err := g.Wait(); err != nil {
160+
return err
161+
}
162+
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("clickhouse_dump_%s_append_columns", target_table)).Observe(time.Since(now).Seconds())
163+
now = time.Now()
140164
err = batch.Send()
141165
if err != nil {
142166
return err
143167
}
144-
log.Debugf("sent batch in %s", time.Since(start))
168+
metrics.TaskDuration.WithLabelValues(fmt.Sprintf("clickhouse_dump_%s_send_batch", target_table)).Observe(time.Since(now).Seconds())
145169
return nil
146170
}
147-
148-
// ConvertToColumnar efficiently converts a slice of any struct type to a slice of slices, each representing a column.
149-
func ConvertToColumnar(data interface{}) ([]interface{}, error) {
150-
start := time.Now()
151-
v := reflect.ValueOf(data)
152-
if v.Kind() != reflect.Slice {
153-
return nil, fmt.Errorf("provided data is not a slice")
154-
}
155-
156-
if v.Len() == 0 {
157-
return nil, fmt.Errorf("slice is empty")
158-
}
159-
160-
elemType := v.Type().Elem()
161-
if elemType.Kind() != reflect.Struct {
162-
return nil, fmt.Errorf("slice elements are not structs")
163-
}
164-
165-
numFields := elemType.NumField()
166-
columns := make([]interface{}, numFields)
167-
colValues := make([]reflect.Value, numFields)
168-
169-
for i := 0; i < numFields; i++ {
170-
fieldType := elemType.Field(i).Type
171-
colSlice := reflect.MakeSlice(reflect.SliceOf(fieldType), v.Len(), v.Len())
172-
x := reflect.New(colSlice.Type())
173-
x.Elem().Set(colSlice)
174-
columns[i] = colSlice
175-
colValues[i] = colSlice.Slice(0, v.Len())
176-
}
177-
178-
var wg sync.WaitGroup
179-
wg.Add(numFields)
180-
181-
for j := 0; j < numFields; j++ {
182-
go func(j int) {
183-
defer wg.Done()
184-
for i := 0; i < v.Len(); i++ {
185-
structValue := v.Index(i)
186-
colValues[j].Index(i).Set(structValue.Field(j))
187-
}
188-
}(j)
189-
}
190-
wg.Wait()
191-
192-
for i, col := range colValues {
193-
columns[i] = col.Interface()
194-
}
195-
log.Infof("columnarized %d rows with %d columns in %s", v.Len(), numFields, time.Since(start))
196-
return columns, nil
197-
}

0 commit comments

Comments
 (0)