Skip to content

Commit 8e7f751

Browse files
committed
feat(db): clickhouse failover support
1 parent a6b4236 commit 8e7f751

File tree

6 files changed

+261
-297
lines changed

6 files changed

+261
-297
lines changed

backend/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ require (
234234
github.com/sanity-io/litter v1.5.5 // indirect
235235
github.com/segmentio/asm v1.2.0 // indirect
236236
github.com/sergi/go-diff v1.2.0 // indirect
237+
github.com/sethvargo/go-envconfig v1.1.0 // indirect
237238
github.com/sethvargo/go-retry v0.2.4 // indirect
238239
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
239240
github.com/spaolacci/murmur3 v1.1.0 // indirect

backend/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,8 @@ github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr
862862
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
863863
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
864864
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
865+
github.com/sethvargo/go-envconfig v1.1.0 h1:cWZiJxeTm7AlCvzGXrEXaSTCNgip5oJepekh/BOQuog=
866+
github.com/sethvargo/go-envconfig v1.1.0/go.mod h1:JLd0KFWQYzyENqnEPWWZ49i4vzZo/6nRidxI8YvGiHw=
865867
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
866868
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
867869
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=

backend/pkg/commons/db/clickhouse.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"fmt"
7+
"net"
78
"reflect"
89
"sync"
910
"time"
@@ -25,7 +26,15 @@ func MustInitClickhouseNative(writer *types.DatabaseConfig) ch.Conn {
2526
if writer.MaxOpenConns < writer.MaxIdleConns {
2627
writer.MaxIdleConns = writer.MaxOpenConns
2728
}
28-
log.Infof("initializing clickhouse native writer db connection to %v:%v/%v with %v/%v conn limit", writer.Host, writer.Port, writer.Name, writer.MaxIdleConns, writer.MaxOpenConns)
29+
var hosts []string
30+
hosts = append(hosts, net.JoinHostPort(writer.Host, writer.Port))
31+
if len(writer.Failovers) > 0 {
32+
for _, f := range writer.Failovers {
33+
hosts = append(hosts, net.JoinHostPort(f.Host, f.Port))
34+
}
35+
}
36+
37+
log.Infof("initializing clickhouse native writer db connection to %v/%v with %v/%v conn limit", hosts, writer.Name, writer.MaxIdleConns, writer.MaxOpenConns)
2938
dbWriter, err := ch.Open(&ch.Options{
3039
MaxOpenConns: writer.MaxOpenConns,
3140
MaxIdleConns: writer.MaxIdleConns,
@@ -34,7 +43,8 @@ func MustInitClickhouseNative(writer *types.DatabaseConfig) ch.Conn {
3443
Compression: &ch.Compression{
3544
Method: ch.CompressionLZ4,
3645
},
37-
Addr: []string{fmt.Sprintf("%s:%s", writer.Host, writer.Port)},
46+
Addr: hosts,
47+
ConnOpenStrategy: ch.ConnOpenInOrder,
3848
Auth: ch.Auth{
3949
Username: writer.Username,
4050
Password: writer.Password,

backend/pkg/commons/db/db.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,23 +102,31 @@ func MustInitDB(writer *types.DatabaseConfig, reader *types.DatabaseConfig, driv
102102
reader.MaxIdleConns = reader.MaxOpenConns
103103
}
104104

105-
var sslParam string
105+
var extraParams string
106106
if driverName == "clickhouse" {
107-
sslParam = "secure=false"
107+
extraParams = "secure=false"
108108
if writer.SSL {
109-
sslParam = "secure=true"
109+
extraParams = "secure=true"
110110
}
111111
// debug
112112
// sslParam += "&debug=true"
113113
} else {
114-
sslParam = "sslmode=disable"
114+
extraParams = "sslmode=disable"
115115
if writer.SSL {
116-
sslParam = "sslmode=require"
116+
extraParams = "sslmode=require"
117117
}
118118
}
119+
var hosts string
120+
hosts = net.JoinHostPort(writer.Host, writer.Port)
121+
if len(writer.Failovers) > 0 {
122+
for _, failover := range writer.Failovers {
123+
hosts += "," + net.JoinHostPort(failover.Host, failover.Port)
124+
}
125+
extraParams += "&connection_open_strategy=in_order"
126+
}
119127

120-
log.Debugf("connecting to %s database %s:%s/%s as writer with %d/%d max open/idle connections", databaseBrand, writer.Host, writer.Port, writer.Name, writer.MaxOpenConns, writer.MaxIdleConns)
121-
dbConnWriter, err := sqlx.Open(driverName, fmt.Sprintf("%s://%s:%s@%s/%s?%s", databaseBrand, writer.Username, writer.Password, net.JoinHostPort(writer.Host, writer.Port), writer.Name, sslParam))
128+
log.Debugf("connecting to %s database %s/%s as writer with %d/%d max open/idle connections", databaseBrand, hosts, writer.Name, writer.MaxOpenConns, writer.MaxIdleConns)
129+
dbConnWriter, err := sqlx.Open(driverName, fmt.Sprintf("%s://%s:%s@%s/%s?%s", databaseBrand, writer.Username, writer.Password, hosts, writer.Name, extraParams))
122130
if err != nil {
123131
log.Fatal(err, "error getting Connection Writer database", 0)
124132
}
@@ -134,21 +142,29 @@ func MustInitDB(writer *types.DatabaseConfig, reader *types.DatabaseConfig, driv
134142
}
135143

136144
if driverName == "clickhouse" {
137-
sslParam = "secure=false"
145+
extraParams = "secure=false"
138146
if writer.SSL {
139-
sslParam = "secure=true"
147+
extraParams = "secure=true"
140148
}
141149
// debug
142150
// sslParam += "&debug=true"
143151
} else {
144-
sslParam = "sslmode=disable"
152+
extraParams = "sslmode=disable"
145153
if writer.SSL {
146-
sslParam = "sslmode=require"
154+
extraParams = "sslmode=require"
155+
}
156+
}
157+
158+
hosts = net.JoinHostPort(reader.Host, reader.Port)
159+
if len(reader.Failovers) > 0 {
160+
for _, failover := range reader.Failovers {
161+
hosts += "," + net.JoinHostPort(failover.Host, failover.Port)
147162
}
163+
extraParams += "&connection_open_strategy=in_order"
148164
}
149165

150-
log.Debugf("connecting to %s database %s:%s/%s as reader with %d/%d max open/idle connections", databaseBrand, reader.Host, reader.Port, reader.Name, reader.MaxOpenConns, reader.MaxIdleConns)
151-
dbConnReader, err := sqlx.Open(driverName, fmt.Sprintf("%s://%s:%s@%s/%s?%s", databaseBrand, reader.Username, reader.Password, net.JoinHostPort(reader.Host, reader.Port), reader.Name, sslParam))
166+
log.Debugf("connecting to %s database %s/%s as reader with %d/%d max open/idle connections", databaseBrand, hosts, reader.Name, reader.MaxOpenConns, reader.MaxIdleConns)
167+
dbConnReader, err := sqlx.Open(driverName, fmt.Sprintf("%s://%s:%s@%s/%s?%s", databaseBrand, reader.Username, reader.Password, hosts, reader.Name, extraParams))
152168
if err != nil {
153169
log.Fatal(err, "error getting Connection Reader database", 0)
154170
}

0 commit comments

Comments
 (0)