Skip to content

Commit 0c162f7

Browse files
authored
Merge pull request #10 from shuwenwei/0425
add parameter 'session-pool-size' for iotdb
2 parents 38abbdf + da3eb52 commit 0c162f7

File tree

1 file changed

+90
-25
lines changed

1 file changed

+90
-25
lines changed

cmd/tsbs_run_queries_iotdb/main.go

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ var (
2222
usingGroupByApi bool // if using group by api when executing query
2323
singleDatabase bool // if using single database, e.g. only one database: root.db. root.db.cpu, root.db.mem belongs to this databse
2424
useAlignedTimeseries bool // using aligned timeseries if set true.
25+
sessionPoolSize int
2526
)
2627

2728
// Global vars:
2829
var (
29-
runner *query.BenchmarkRunner
30+
runner *query.BenchmarkRunner
31+
sessionPool client.SessionPool
3032
)
3133

3234
// Parse args:
@@ -41,6 +43,7 @@ func init() {
4143
pflag.Bool("use-groupby", false, "Whether to use group by api")
4244
pflag.Bool("single-database", false, "Whether to use single database")
4345
pflag.Bool("aligned-timeseries", false, "Whether to use aligned time series")
46+
pflag.Uint("session-pool-size", 0, "Session pool size")
4447

4548
pflag.Parse()
4649

@@ -62,9 +65,10 @@ func init() {
6265
usingGroupByApi = viper.GetBool("use-groupby")
6366
singleDatabase = viper.GetBool("single-database")
6467
useAlignedTimeseries = viper.GetBool("aligned-timeseries")
68+
sessionPoolSize = viper.GetInt("session-pool-size")
6569
timeoutInMs = 0
6670

67-
log.Printf("tsbs_run_queries_iotdb target: %s:%s. Loading with %d workers.\n", host, port, workers)
71+
log.Printf("tsbs_run_queries_iotdb target: %s:%s. Loading with %d workers. session-pool-size: %d\n", host, port, workers, sessionPoolSize)
6872
if workers < 5 {
6973
log.Println("Insertion throughput is strongly related to the number of threads. Use more workers for better performance.")
7074
}
@@ -76,6 +80,16 @@ func init() {
7680
Password: password,
7781
}
7882

83+
if sessionPoolSize > 0 {
84+
poolConfig := &client.PoolConfig{
85+
Host: host,
86+
Port: port,
87+
UserName: user,
88+
Password: password,
89+
}
90+
sessionPool = client.NewSessionPool(poolConfig, sessionPoolSize, 60000, 60000, false)
91+
}
92+
7993
runner = query.NewBenchmarkRunner(config)
8094
}
8195

@@ -92,16 +106,33 @@ type processor struct {
92106
func newProcessor() query.Processor { return &processor{} }
93107

94108
func (p *processor) Init(workerNumber int) {
95-
p.session = client.NewSession(&clientConfig)
96109
p.printResponses = runner.DoPrintResponses()
97-
if err := p.session.Open(false, int(timeoutInMs)); err != nil {
98-
errMsg := fmt.Sprintf("query processor init error, session is not open: %v\n", err)
99-
errMsg = errMsg + fmt.Sprintf("timeout setting: %d ms", timeoutInMs)
100-
log.Fatal(errMsg)
101-
}
102-
_, err := p.session.ExecuteStatement("flush")
103-
if err != nil {
104-
log.Fatal(fmt.Sprintf("flush meets error: %v\n", err))
110+
111+
if sessionPoolSize <= 0 {
112+
p.session = client.NewSession(&clientConfig)
113+
if err := p.session.Open(false, int(timeoutInMs)); err != nil {
114+
errMsg := fmt.Sprintf("query processor init error, session is not open: %v\n", err)
115+
errMsg = errMsg + fmt.Sprintf("timeout setting: %d ms", timeoutInMs)
116+
log.Fatal(errMsg)
117+
}
118+
if workerNumber == 0 {
119+
_, err := p.session.ExecuteStatement("flush")
120+
if err != nil {
121+
log.Fatal(fmt.Sprintf("flush meets error: %v\n", err))
122+
}
123+
}
124+
} else {
125+
session, err := sessionPool.GetSession()
126+
if err != nil {
127+
log.Fatal(fmt.Sprintf("flush meets error: %v\n", err))
128+
}
129+
if workerNumber == 0 {
130+
_, err = session.ExecuteStatement("flush")
131+
if err != nil {
132+
log.Fatal(fmt.Sprintf("flush meets error: %v\n", err))
133+
}
134+
}
135+
sessionPool.PutBack(session)
105136
}
106137
}
107138

@@ -116,16 +147,30 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
116147
var legalNodes = true
117148
var err error
118149

119-
start := time.Now().UnixNano()
150+
start := time.Now()
120151
if startTimeInMills > 0 {
121152
if usingGroupByApi {
153+
idx := strings.LastIndex(aggregatePaths[0], ".")
154+
device := aggregatePaths[0][:idx]
155+
measurement := aggregatePaths[0][idx+1:]
122156
splits := strings.Split(aggregatePaths[0], ".")
123157
db := splits[0] + "." + splits[1]
124-
device := strings.Join(splits[:len(splits)-1], ".")
125-
measurement := splits[len(splits)-1]
126-
dataSet, err = p.session.ExecuteGroupByQueryIntervalQuery(&db, device, measurement,
127-
common.TAggregationType_MAX_VALUE, 1,
128-
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &useAlignedTimeseries)
158+
var err error
159+
if sessionPoolSize > 0 {
160+
session, err := sessionPool.GetSession()
161+
if err == nil {
162+
dataSet, err = session.ExecuteGroupByQueryIntervalQuery(&db, device, measurement,
163+
common.TAggregationType_MAX_VALUE, 1,
164+
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &useAlignedTimeseries)
165+
} else {
166+
log.Printf("Get session meets error.\n")
167+
}
168+
sessionPool.PutBack(session)
169+
} else {
170+
dataSet, err = p.session.ExecuteGroupByQueryIntervalQuery(&db, device, measurement,
171+
common.TAggregationType_MAX_VALUE, 1,
172+
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &useAlignedTimeseries)
173+
}
129174

130175
if err != nil {
131176
fmt.Printf("ExecuteGroupByQueryIntervalQuery meets error, "+
@@ -142,9 +187,21 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
142187
}
143188
}
144189
} else {
145-
dataSet, err = p.session.ExecuteAggregationQueryWithLegalNodes(aggregatePaths,
146-
[]common.TAggregationType{common.TAggregationType_MAX_VALUE},
147-
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &legalNodes)
190+
if sessionPoolSize > 0 {
191+
session, err := sessionPool.GetSession()
192+
if err == nil {
193+
dataSet, err = session.ExecuteAggregationQueryWithLegalNodes(aggregatePaths,
194+
[]common.TAggregationType{common.TAggregationType_MAX_VALUE},
195+
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &legalNodes)
196+
} else {
197+
log.Printf("Get session meets error.\n")
198+
}
199+
sessionPool.PutBack(session)
200+
} else {
201+
dataSet, err = p.session.ExecuteAggregationQueryWithLegalNodes(aggregatePaths,
202+
[]common.TAggregationType{common.TAggregationType_MAX_VALUE},
203+
&startTimeInMills, &endTimeInMills, &interval, &timeoutInMs, &legalNodes)
204+
}
148205

149206
if err != nil {
150207
fmt.Printf("ExecuteAggregationQueryWithLegalNodes meets error, "+
@@ -161,19 +218,27 @@ func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
161218
}
162219
}
163220
} else {
164-
dataSet, err = p.session.ExecuteQueryStatement(sql, &timeoutInMs)
221+
if sessionPoolSize > 0 {
222+
session, err := sessionPool.GetSession()
223+
if err == nil {
224+
dataSet, err = session.ExecuteQueryStatement(sql, &timeoutInMs)
225+
} else {
226+
log.Printf("Get session meets error.\n")
227+
}
228+
sessionPool.PutBack(session)
229+
} else {
230+
dataSet, err = p.session.ExecuteQueryStatement(sql, &timeoutInMs)
231+
}
165232
}
166233

167234
if err != nil {
168235
log.Printf("An error occurred while executing query SQL: %s\n", iotdbQ.SqlQuery)
169236
return nil, err
170237
}
171238

172-
took := time.Now().UnixNano() - start
173-
174-
lag := float64(took) / float64(time.Millisecond) // in milliseconds
239+
took := float64(time.Since(start).Nanoseconds()) / 1e6
175240
stat := query.GetStat()
176-
stat.Init(q.HumanLabelName(), lag)
241+
stat.Init(q.HumanLabelName(), took)
177242
return []*query.Stat{stat}, err
178243
}
179244

0 commit comments

Comments
 (0)