Skip to content

Commit d859381

Browse files
fuliwenfuliwen
and
fuliwen
authored
session pool supports multiple nodes (#78)
Co-authored-by: fuliwen <[email protected]>
1 parent ec57c6c commit d859381

File tree

5 files changed

+94
-36
lines changed

5 files changed

+94
-36
lines changed

Diff for: README.md

+13
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ If there is no available connections and the pool reaches its max size, the all
8686
The PutBack method must be called after use
8787

8888
### New sessionPool
89+
standalone
8990

9091
```golang
9192

@@ -97,6 +98,18 @@ config := &client.PoolConfig{
9798
}
9899
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
99100

101+
```
102+
cluster or doubleLive
103+
104+
```golang
105+
106+
config := &client.PoolConfig{
107+
UserName: user,
108+
Password: password,
109+
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
110+
}
111+
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
112+
100113
```
101114

102115
### Get session through sessionPool, putback after use

Diff for: README_ZH.md

+15
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ go run session_example.go
6969

7070
### 创建sessionPool
7171

72+
单实例
7273
```golang
7374

7475
config := &client.PoolConfig{
@@ -81,6 +82,20 @@ sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
8182

8283
```
8384

85+
分布式或双活
86+
87+
```golang
88+
89+
config := &client.PoolConfig{
90+
UserName: user,
91+
Password: password,
92+
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
93+
}
94+
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
95+
96+
```
97+
98+
8499
### 使用sessionPool获取session,使用完手动调用PutBack
85100

86101
例1:设置存储组

Diff for: client/session.go

+19-17
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,12 @@ func (s *Session) Open(enableRPCCompression bool, connectionTimeoutInMs int) err
118118
}
119119

120120
type ClusterConfig struct {
121-
NodeUrls []string //ip:port
122-
UserName string
123-
Password string
124-
FetchSize int32
125-
TimeZone string
121+
NodeUrls []string //ip:port
122+
UserName string
123+
Password string
124+
FetchSize int32
125+
TimeZone string
126+
ConnectRetryMax int
126127
}
127128

128129
type ClusterSession struct {
@@ -975,12 +976,12 @@ func NewSession(config *Config) Session {
975976
return Session{config: config}
976977
}
977978

978-
func NewClusterSession(ClusterConfig *ClusterConfig) Session {
979+
func NewClusterSession(clusterConfig *ClusterConfig) Session {
979980
session := Session{}
980981
node := endPoint{}
981-
for i := 0; i < len(ClusterConfig.NodeUrls); i++ {
982-
node.Host = strings.Split(ClusterConfig.NodeUrls[i], ":")[0]
983-
node.Port = strings.Split(ClusterConfig.NodeUrls[i], ":")[1]
982+
for i := 0; i < len(clusterConfig.NodeUrls); i++ {
983+
node.Host = strings.Split(clusterConfig.NodeUrls[i], ":")[0]
984+
node.Port = strings.Split(clusterConfig.NodeUrls[i], ":")[1]
984985
endPointList.PushBack(node)
985986
}
986987
var err error
@@ -996,7 +997,7 @@ func NewClusterSession(ClusterConfig *ClusterConfig) Session {
996997
log.Println(err)
997998
} else {
998999
session.config = getConfig(e.Value.(endPoint).Host, e.Value.(endPoint).Port,
999-
ClusterConfig.UserName, ClusterConfig.Password, ClusterConfig.FetchSize, ClusterConfig.TimeZone)
1000+
clusterConfig.UserName, clusterConfig.Password, clusterConfig.FetchSize, clusterConfig.TimeZone, clusterConfig.ConnectRetryMax)
10001001
break
10011002
}
10021003
}
@@ -1052,14 +1053,15 @@ func (s *Session) initClusterConn(node endPoint) error {
10521053

10531054
}
10541055

1055-
func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string) *Config {
1056+
func getConfig(host string, port string, userName string, passWord string, fetchSize int32, timeZone string, connectRetryMax int) *Config {
10561057
return &Config{
1057-
Host: host,
1058-
Port: port,
1059-
UserName: userName,
1060-
Password: passWord,
1061-
FetchSize: fetchSize,
1062-
TimeZone: timeZone,
1058+
Host: host,
1059+
Port: port,
1060+
UserName: userName,
1061+
Password: passWord,
1062+
FetchSize: fetchSize,
1063+
TimeZone: timeZone,
1064+
ConnectRetryMax: connectRetryMax,
10631065
}
10641066
}
10651067

Diff for: client/sessionpool.go

+25-6
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (spool *SessionPool) GetSession() (session Session, err error) {
7878
if ok {
7979
return session, nil
8080
} else {
81-
log.Println("sessionpool has closed")
81+
log.Println("sessionPool has closed")
8282
return session, errPoolClosed
8383
}
8484
default:
@@ -93,11 +93,19 @@ func (spool *SessionPool) GetSession() (session Session, err error) {
9393
}
9494
}
9595

96-
func (spool *SessionPool) ConstructSession(config *PoolConfig) (Session, error) {
97-
session := NewSession(getSessionConfig(config))
98-
if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
99-
log.Print(err)
100-
return session, err
96+
func (spool *SessionPool) ConstructSession(config *PoolConfig) (session Session, err error) {
97+
if len(config.NodeUrls) > 0 {
98+
session = NewClusterSession(getClusterSessionConfig(config))
99+
if err := session.OpenCluster(spool.enableCompression); err != nil {
100+
log.Print(err)
101+
return session, err
102+
}
103+
} else {
104+
session = NewSession(getSessionConfig(config))
105+
if err := session.Open(spool.enableCompression, spool.connectionTimeoutInMs); err != nil {
106+
log.Print(err)
107+
return session, err
108+
}
101109
}
102110
return session, nil
103111
}
@@ -114,6 +122,17 @@ func getSessionConfig(config *PoolConfig) *Config {
114122
}
115123
}
116124

125+
func getClusterSessionConfig(config *PoolConfig) *ClusterConfig {
126+
return &ClusterConfig{
127+
NodeUrls: config.NodeUrls,
128+
UserName: config.UserName,
129+
Password: config.Password,
130+
FetchSize: config.FetchSize,
131+
TimeZone: config.TimeZone,
132+
ConnectRetryMax: config.ConnectRetryMax,
133+
}
134+
}
135+
117136
func (spool *SessionPool) PutBack(session Session) {
118137
if session.trans.IsOpen() {
119138
spool.ch <- session

Diff for: example/session_pool/session_pool_example.go

+22-13
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"log"
2626
"math/rand"
27+
"strings"
2728
"sync"
2829
"time"
2930

@@ -60,8 +61,8 @@ func main() {
6061
wg.Add(1)
6162
go func() {
6263
defer wg.Done()
63-
setStorageGroup(fmt.Sprintf("root.ln%d", j))
64-
deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
64+
setStorageGroup(fmt.Sprintf("root.ln-%d", j))
65+
deleteStorageGroup(fmt.Sprintf("root.ln-%d", j))
6566

6667
}()
6768

@@ -134,17 +135,6 @@ func main() {
134135
insertAlignedTablets()
135136
deleteTimeseries("root.ln.device1.*")
136137
executeQueryStatement("show timeseries root.**")
137-
for i := 0; i < 10000; i++ {
138-
var j = i
139-
wg.Add(1)
140-
go func() {
141-
defer wg.Done()
142-
setStorageGroup(fmt.Sprintf("root.ln%d", j))
143-
deleteStorageGroup(fmt.Sprintf("root.ln%d", j))
144-
145-
}()
146-
147-
}
148138
wg.Wait()
149139

150140
}
@@ -773,3 +763,22 @@ func checkError(status *rpc.TSStatus, err error) {
773763
}
774764
}
775765
}
766+
767+
// If your IotDB is a cluster version or doubleLive, you can use the following code for session pool connection
768+
func useSessionPool() {
769+
770+
config := &client.PoolConfig{
771+
UserName: user,
772+
Password: password,
773+
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
774+
}
775+
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
776+
defer sessionPool.Close()
777+
session, err := sessionPool.GetSession()
778+
defer sessionPool.PutBack(session)
779+
if err != nil {
780+
log.Print(err)
781+
return
782+
}
783+
784+
}

0 commit comments

Comments
 (0)