Skip to content

Commit 3d90c89

Browse files
committed
Enhancements in go client library.
1 parent 60672b5 commit 3d90c89

25 files changed

+1287
-347
lines changed

cmd/dbscanserv/prime/replicate.go

+13-22
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package prime
2121

2222
import (
2323
"errors"
24-
"runtime"
2524
"time"
2625

2726
"github.com/paypal/junodb/third_party/forked/golang/glog"
@@ -34,7 +33,7 @@ import (
3433

3534
var (
3635
secConfig *sec.Config
37-
processor []*cli.Processor
36+
processor *cli.Processor
3837
inChan = make(chan *proto.OperationalMessage, 1000)
3938
outChan = make(chan bool, 1000)
4039
)
@@ -57,39 +56,31 @@ func InitReplicator(proxyAddr string, numConns int) {
5756
}
5857

5958
if numConns <= 0 {
60-
numConns = 1
59+
numConns = 2
6160
}
6261
if numConns > 4 {
6362
numConns = 4
6463
}
6564

66-
processor = make([]*cli.Processor, numConns)
67-
for i := 0; i < numConns; i++ {
65+
processor = cli.NewProcessor(
66+
io.ServiceEndpoint{Addr: proxyAddr, SSLEnabled: secConfig != nil},
67+
"dbscan",
68+
numConns, // connPoolSize
69+
time.Duration(500*time.Millisecond), // ConnectTimeout
70+
time.Duration(1000*time.Millisecond), // ResponseTimeout
71+
nil) // GetTLSConfig
6872

69-
processor[i] = cli.NewProcessor(
70-
io.ServiceEndpoint{Addr: proxyAddr, SSLEnabled: secConfig != nil},
71-
"dbscan",
72-
time.Duration(500*time.Millisecond), // ConnectTimeout
73-
time.Duration(1000*time.Millisecond), // RequestTimeout
74-
time.Duration(60*time.Second)) // ConnectRecycleTimeout
75-
76-
processor[i].Start()
77-
78-
runtime.SetFinalizer(processor[i], func(p *cli.Processor) {
79-
p.Close()
80-
})
81-
82-
go processRequest(i)
83-
}
73+
processor.Start()
74+
go processRequest()
8475
}
8576

86-
func processRequest(k int) {
77+
func processRequest() {
8778
count := uint64(0)
8879
for {
8980
select {
9081
case op := <-inChan:
9182
for i := 0; i < 3; i++ {
92-
reply, err := processor[k].ProcessRequest(op)
83+
reply, err := processor.ProcessRequest(op)
9384
if err != nil {
9485
if i < 2 {
9586
time.Sleep(10 * time.Millisecond)

cmd/dbscanserv/prime/result.go

+4-8
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,10 @@ func RepairKey(key []byte, display bool) bool {
8888
if !found {
8989

9090
clientCfg := client.Config{
91-
Appname: "dbscan",
92-
Namespace: ns,
93-
RetryCount: 1,
94-
ConnectTimeout: util.Duration{500 * time.Millisecond},
95-
ReadTimeout: util.Duration{500 * time.Millisecond},
96-
WriteTimeout: util.Duration{500 * time.Millisecond},
97-
RequestTimeout: util.Duration{1000 * time.Millisecond},
98-
ConnRecycleTimeout: util.Duration{60 * time.Second},
91+
Appname: "dbscan",
92+
Namespace: ns,
93+
ConnectTimeout: util.Duration{500 * time.Millisecond},
94+
ResponseTimeout: util.Duration{1000 * time.Millisecond},
9995
}
10096
clientCfg.Server.Addr = proxyAddr
10197

cmd/tools/cmd/cfg/rtcfg.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/paypal/junodb/pkg/client"
3232
"github.com/paypal/junodb/pkg/cmd"
3333
"github.com/paypal/junodb/pkg/etcd"
34+
"github.com/paypal/junodb/pkg/util"
3435
)
3536

3637
const (
@@ -78,7 +79,10 @@ func (c *cmdRuntimeConfig) Parse(args []string) (err error) {
7879
return
7980
}
8081
}
81-
c.clientConfig.SetDefault()
82+
c.clientConfig.DefaultTimeToLive = 1800
83+
c.clientConfig.ConnPoolSize = 1
84+
c.clientConfig.ConnectTimeout = util.Duration{1000 * time.Millisecond}
85+
c.clientConfig.ResponseTimeout = util.Duration{1000 * time.Millisecond}
8286

8387
if cfg, e := c.config.GetConfig("Juno"); e == nil {
8488
cfg.WriteTo(&c.clientConfig)

cmd/tools/cmd/cli/cli.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,10 @@ const (
4747
)
4848

4949
var defaultConfig = client.Config{
50-
RetryCount: 1,
51-
DefaultTimeToLive: 1800,
52-
ConnectTimeout: util.Duration{100 * time.Millisecond},
53-
ReadTimeout: util.Duration{500 * time.Millisecond},
54-
WriteTimeout: util.Duration{500 * time.Millisecond},
55-
RequestTimeout: util.Duration{1000 * time.Millisecond},
56-
ConnRecycleTimeout: util.Duration{9 * time.Second},
50+
DefaultTimeToLive: 1800,
51+
ConnPoolSize: 1,
52+
ConnectTimeout: util.Duration{100 * time.Millisecond},
53+
ResponseTimeout: util.Duration{1000 * time.Millisecond},
5754
}
5855

5956
type (

cmd/tools/cmd/cli/sscli.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ import (
3737
)
3838

3939
const (
40-
connectTimeout = 100 * time.Millisecond
41-
requestTimeout = 1000 * time.Millisecond
40+
connectTimeout = 100 * time.Millisecond
41+
responseTimeout = 1000 * time.Millisecond
4242
)
4343

4444
type (
@@ -173,9 +173,10 @@ func (s *shardOptionsT) getShardId(key []byte) uint16 {
173173

174174
func newProcessor(cfg *client.Config) *cli.Processor {
175175
processor := cli.NewProcessor(cfg.Server, cfg.Appname,
176+
1, // connPoolSize
176177
cfg.ConnectTimeout.Duration,
177-
cfg.RequestTimeout.Duration,
178-
cfg.ConnRecycleTimeout.Duration)
178+
cfg.ResponseTimeout.Duration,
179+
nil) // GetTLSConfig
179180
processor.Start()
180181
return processor
181182
}

internal/cli/conn.go

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
//
2+
// Copyright 2023 PayPal Inc.
3+
//
4+
// Licensed to the Apache Software Foundation (ASF) under one or more
5+
// contributor license agreements. See the NOTICE file distributed with
6+
// this work for additional information regarding copyright ownership.
7+
// The ASF licenses this file to You under the Apache License, Version 2.0
8+
// (the "License"); you may not use this file except in compliance with
9+
// the License. You may obtain a copy of the License at
10+
//
11+
// http://www.apache.org/licenses/LICENSE-2.0
12+
//
13+
// Unless required by applicable law or agreed to in writing, software
14+
// distributed under the License is distributed on an "AS IS" BASIS,
15+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
// See the License for the specific language governing permissions and
17+
// limitations under the License.
18+
//
19+
20+
package cli
21+
22+
import (
23+
"crypto/tls"
24+
"errors"
25+
"fmt"
26+
"net"
27+
"time"
28+
29+
"github.com/paypal/junodb/pkg/logging"
30+
"github.com/paypal/junodb/pkg/logging/cal"
31+
)
32+
33+
func TLSInitialized() bool {
34+
return false
35+
}
36+
37+
func Dial(addr string, timeout time.Duration, getTLSConfig func() *tls.Config) (conn net.Conn, err error) {
38+
var tlsConn *tls.Conn
39+
40+
if getTLSConfig == nil {
41+
return nil, errors.New("Unable to get TLS config")
42+
}
43+
timeStart := time.Now()
44+
tlsCfg := getTLSConfig()
45+
if tlsCfg == nil {
46+
err = errors.New("Unable to get TLS config")
47+
} else {
48+
dialer := &net.Dialer{Timeout: timeout}
49+
tlsConn, err = tls.DialWithDialer(dialer, "tcp", addr, tlsCfg)
50+
conn = tlsConn
51+
if tlsConn == nil && err == nil {
52+
err = errors.New("Connect failed.")
53+
}
54+
}
55+
56+
if !cal.IsEnabled() {
57+
return conn, err
58+
}
59+
60+
// Cal logging
61+
status := cal.StatusSuccess
62+
b := logging.NewKVBuffer()
63+
if err != nil {
64+
status = cal.StatusError
65+
b.Add([]byte("err"), err.Error())
66+
} else {
67+
b.Add([]byte("ssl"), getConnectionState(tlsConn))
68+
}
69+
70+
cal.AtomicTransaction(cal.TxnTypeConnect, addr, status,
71+
time.Since(timeStart), b.Bytes())
72+
73+
return conn, err
74+
}
75+
76+
func getConnectionState(c *tls.Conn) string {
77+
if c == nil {
78+
return ""
79+
}
80+
81+
st := c.ConnectionState()
82+
rid := 0
83+
if st.DidResume {
84+
rid = 1
85+
}
86+
msg := fmt.Sprintf("GoTLS:%s:%s:ssl_r=%d", getVersionName(st.Version),
87+
tls.CipherSuiteName(st.CipherSuite), rid)
88+
89+
return msg
90+
}
91+
92+
func getVersionName(ver uint16) string {
93+
switch ver {
94+
case tls.VersionSSL30:
95+
return "SSLv3"
96+
case tls.VersionTLS10:
97+
return "TLSv1"
98+
case tls.VersionTLS11:
99+
return "TLSv1.1"
100+
case tls.VersionTLS12:
101+
return "TLSv1.2"
102+
case tls.VersionTLS13:
103+
return "TLSv1.3"
104+
default:
105+
return ""
106+
}
107+
}

0 commit comments

Comments
 (0)