Skip to content

enable to set thrift http response buf limit #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func logoutAndClose(conn *connection, sessionID int64) {
func TestConnection(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestConnection(t *testing.T) {
func TestConnectionIPv6(t *testing.T) {
hostAddress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestAuthentication(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}

conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1421,7 +1421,7 @@ func prepareSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1458,7 +1458,7 @@ func dropSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false, nil, 0)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down
16 changes: 16 additions & 0 deletions configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type PoolConfig struct {
UseHTTP2 bool
// HttpHeader is the http headers for the connection when using HTTP2
HttpHeader http.Header
// HttpResponseBufLimit is the response buffer limit.
// If the response is larger than this, the buffer will be renewed.
HttpResponseBufLimit int
}

// validateConf validates config
Expand All @@ -54,6 +57,10 @@ func (conf *PoolConfig) validateConf(log Logger) {
conf.MinConnPoolSize = 0
log.Warn("Invalid MinConnPoolSize value, the default value of 0 has been applied")
}
if conf.HttpResponseBufLimit < 0 {
conf.HttpResponseBufLimit = 0
log.Warn("Invalid HttpResponseBufLimit value, the default value of 0 has been applied")
}
}

// GetDefaultConf returns the default config
Expand Down Expand Up @@ -138,6 +145,9 @@ type SessionPoolConf struct {
useHTTP2 bool
// httpHeader is the http headers for the connection
httpHeader http.Header
// HttpResponseBufLimit is the response buffer limit.
// If the response is larger than this, the buffer will be renewed.
HttpResponseBufLimit int
}

type SessionPoolConfOption func(*SessionPoolConf)
Expand Down Expand Up @@ -214,6 +224,12 @@ func WithHttpHeader(header http.Header) SessionPoolConfOption {
}
}

func WithHttpResponseBufLimit(limit int) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.HttpResponseBufLimit = limit
}
}

func (conf *SessionPoolConf) checkMandatoryFields() error {
// Check mandatory fields
if conf.username == "" {
Expand Down
28 changes: 19 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
)

type connection struct {
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
httpHeader http.Header
graph *graph.GraphServiceClient
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
httpHeader http.Header
httpResponseBufLimit int
graph *graph.GraphServiceClient
}

func newConnection(severAddress HostAddress) *connection {
Expand All @@ -46,12 +47,14 @@ func newConnection(severAddress HostAddress) *connection {
// open opens a transport for the connection
// if sslConfig is not nil, an SSL transport will be created
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config,
useHTTP2 bool, httpHeader http.Header) error {
useHTTP2 bool, httpHeader http.Header, httpRespBufferLimit int) error {
ip := hostAddress.Host
port := hostAddress.Port
newAdd := net.JoinHostPort(ip, strconv.Itoa(port))
cn.timeout = timeout
cn.useHTTP2 = useHTTP2
cn.httpHeader = httpHeader
cn.httpResponseBufLimit = cn.httpResponseBufLimit

var (
err error
Expand Down Expand Up @@ -103,6 +106,13 @@ func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslCo
}
}
}
if httpRespBufferLimit != 0 {
client, ok := transport.(*thrift.HTTPClient)
if !ok {
return fmt.Errorf("failed to get thrift http client")
}
client.SetResponseBufferLimit(int64(httpRespBufferLimit))
}
} else {
bufferSize := 128 << 10

Expand Down Expand Up @@ -150,7 +160,7 @@ func (cn *connection) verifyClientVersion() error {
// When the timeout occurs, the connection will be reopened to avoid the impact of the message.
func (cn *connection) reopen() error {
cn.close()
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2, cn.httpHeader)
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2, cn.httpHeader, cn.httpResponseBufLimit)
}

// Authenticate
Expand Down
6 changes: 3 additions & 3 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (pool *ConnectionPool) initPool() error {

// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HttpResponseBufLimit); err != nil {
// If initialization failed, clean idle queue
idleLen := pool.idleConnectionQueue.Len()
for i := 0; i < idleLen; i++ {
Expand Down Expand Up @@ -246,7 +246,7 @@ func (pool *ConnectionPool) newConnToHost() (*connection, error) {
newConn := newConnection(host)
// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig,
pool.conf.UseHTTP2, pool.conf.HttpHeader); err != nil {
pool.conf.UseHTTP2, pool.conf.HttpHeader, pool.conf.HttpResponseBufLimit); err != nil {
return nil, err
}
// Add connection to active queue
Expand Down Expand Up @@ -371,7 +371,7 @@ func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Conf
useHTTP2 bool, httpHeader http.Header) error {
newConn := newConnection(address)
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader); err != nil {
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2, httpHeader, 0); err != nil {
return err
}
defer newConn.close()
Expand Down
2 changes: 1 addition & 1 deletion session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (pool *SessionPool) newSession() (*pureSession, error) {

// open a new connection
if err := cn.open(cn.severAddress, pool.conf.timeOut, pool.conf.sslConfig,
pool.conf.useHTTP2, pool.conf.httpHeader); err != nil {
pool.conf.useHTTP2, pool.conf.httpHeader, pool.conf.HttpResponseBufLimit); err != nil {
return nil, fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}

Expand Down