Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import (
"github.com/cloudwego/kitex/pkg/http"
"github.com/cloudwego/kitex/pkg/loadbalance"
"github.com/cloudwego/kitex/pkg/proxy"
connpool2 "github.com/cloudwego/kitex/pkg/remote/connpool"
"github.com/cloudwego/kitex/pkg/remote/trans/gonet"
"github.com/cloudwego/kitex/pkg/remote/trans/netpoll"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc"
"github.com/cloudwego/kitex/pkg/retry"
"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand Down Expand Up @@ -764,6 +767,20 @@ func TestTailOption(t *testing.T) {
test.Assert(t, opts.RemoteOpt.Dialer != nil)
}

func TestGonetOption(t *testing.T) {
// gonet
opt := client.NewOptions([]Option{WithDialer(gonet.NewDialer()), WithLongConnection(connpool.IdleConfig{MaxIdlePerAddress: 10})})
d := opt.RemoteOpt.ConnPool.(*connpool2.LongPool)
pcfg := d.Config()
test.Assert(t, pcfg.Enable)

// netpoll
opt = client.NewOptions([]Option{WithDialer(netpoll.NewDialer()), WithLongConnection(connpool.IdleConfig{MaxIdlePerAddress: 10})})
d = opt.RemoteOpt.ConnPool.(*connpool2.LongPool)
pcfg = d.Config()
test.Assert(t, !pcfg.Enable)
}

func checkOneOptionDebugInfo(t *testing.T, opt Option, expectStr string) error {
o := &Options{}
o.Apply([]Option{opt})
Expand Down
22 changes: 20 additions & 2 deletions internal/client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cloudwego/localsession/backup"

"github.com/cloudwego/kitex/internal/configutil"
internalRemote "github.com/cloudwego/kitex/internal/remote"
"github.com/cloudwego/kitex/internal/stream"
"github.com/cloudwego/kitex/pkg/acl"
"github.com/cloudwego/kitex/pkg/circuitbreak"
Expand Down Expand Up @@ -292,22 +293,27 @@ func (o *Options) initRemoteOpt() {
}
o.RemoteOpt.TTHeaderStreamingProvider = ttstream.NewClientProvider(o.TTHeaderStreamingOptions.TransportOptions...)
}

_, setConnPoolProactiveCheck := o.RemoteOpt.Dialer.(internalRemote.IsGonetDialer)
if o.RemoteOpt.ConnPool == nil {
if o.PoolCfg != nil {
if *o.PoolCfg == zero {
o.RemoteOpt.ConnPool = connpool.NewShortPool(o.Svr.ServiceName)
} else {
o.RemoteOpt.ConnPool = connpool.NewLongPool(o.Svr.ServiceName, *o.PoolCfg)
cfg := newDefaultLongPoolCfg(o.Svr.ServiceName, *o.PoolCfg, setConnPoolProactiveCheck)
o.RemoteOpt.ConnPool = connpool.NewLongPoolWithConfig(cfg)
}
} else {
o.RemoteOpt.ConnPool = connpool.NewLongPool(
cfg := newDefaultLongPoolCfg(
o.Svr.ServiceName,
connpool2.IdleConfig{
MaxIdlePerAddress: 10,
MaxIdleGlobal: 100,
MaxIdleTimeout: time.Minute,
},
setConnPoolProactiveCheck,
)
o.RemoteOpt.ConnPool = connpool.NewLongPoolWithConfig(cfg)
}
}
}
Expand All @@ -319,3 +325,15 @@ func (o *Options) InitRetryContainer() {
o.CloseCallbacks = append(o.CloseCallbacks, o.UnaryOptions.RetryContainer.Close)
}
}

func newDefaultLongPoolCfg(serviceName string, idleCfg connpool2.IdleConfig, enableProactiveCheck bool) connpool.LongPoolConfig {
return connpool.LongPoolConfig{
ServiceName: serviceName,
IdleConfig: idleCfg,
ProactiveCheckConfig: connpool.ProactiveCheckConfig{
Enable: enableProactiveCheck,
CheckFunc: internalRemote.ConnectionStateCheck,
Interval: connpool.DefaultProactiveConnCheckInterval,
},
}
}
70 changes: 70 additions & 0 deletions internal/remote/conn_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//go:build !windows

/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package remote

import (
"fmt"
"net"
"syscall"

"golang.org/x/sys/unix"
)

// ConnectionStateCheck uses unix.Poll to detect the connection state
// Since the connections are stored in the pool, we treat any POLLIN event as connection close and set the connection state to closed.
func ConnectionStateCheck(conns ...net.Conn) error {
pollFds := make([]unix.PollFd, 0, len(conns))

for _, conn := range conns {
sysConn, ok := conn.(syscall.Conn)
if !ok {
return fmt.Errorf("conn is not a syscall.Conn, got %T", conn)
}
rawConn, err := sysConn.SyscallConn()
if err != nil {
return err
}
var fd int
err = rawConn.Control(func(fileDescriptor uintptr) {
fd = int(fileDescriptor)
})
if err != nil {
return err
}
pollFds = append(pollFds, unix.PollFd{Fd: int32(fd), Events: unix.POLLIN})
}

n, err := unix.Poll(pollFds, 0)
if err != nil {
return err
}
if n == 0 {
return nil
}
for i := 0; i < len(pollFds); i++ {
if pollFds[i].Revents&unix.POLLIN != 0 {
// the connection should not receive any data, POLLIN means FIN or RST
// set the state
if s, ok := conns[i].(SetConnState); ok {
s.SetConnState(true)
}
}
}
return nil
}
86 changes: 86 additions & 0 deletions internal/remote/conn_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//go:build !windows

/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package remote

import (
"errors"
"net"
"strings"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/cloudwego/kitex/internal/test"
)

var _ SetConnState = &mockConn{}

type mockConn struct {
net.Conn
closed atomic.Bool
}

func (m *mockConn) SetConnState(c bool) {
m.closed.Store(c)
}

func (m *mockConn) SyscallConn() (syscall.RawConn, error) {
if sc, ok := m.Conn.(syscall.Conn); ok {
return sc.SyscallConn()
}
return nil, errors.New("not syscall.Conn")
}

func TestConnectionStateCheck(t *testing.T) {
// wrong connection type
err := ConnectionStateCheck(net.Pipe())
test.Assert(t, err != nil)
test.Assert(t, strings.Contains(err.Error(), "conn is not a syscall.Conn"))

ln, err := net.Listen("tcp", "127.0.0.1:0") // 本地端口自动分配
test.Assert(t, err == nil, err)
defer ln.Close()

done := make(chan net.Conn)
go func() {
conn, e := ln.Accept()
test.Assert(t, e == nil)
done <- conn
}()

clientConn, err := net.Dial("tcp", ln.Addr().String())
test.Assert(t, err == nil, err)

serverConn := <-done
serverConnWithState := &mockConn{Conn: serverConn}
// check, not closed
err = ConnectionStateCheck(serverConnWithState)
test.Assert(t, err == nil, err)
test.Assert(t, !serverConnWithState.closed.Load())

// close conn
clientConn.Close()
time.Sleep(100 * time.Millisecond)

// check, closed
err = ConnectionStateCheck(serverConnWithState)
test.Assert(t, err == nil, err)
test.Assert(t, serverConnWithState.closed.Load())
}
29 changes: 29 additions & 0 deletions internal/remote/conn_check_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//go:build windows
// +build windows

/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package remote

import (
"net"
)

// FIXME: windows not supported
func ConnectionStateCheck(conns ...net.Conn) error {
return nil
}
27 changes: 27 additions & 0 deletions internal/remote/gonet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package remote

// SetConnState only used to set the state to connection in gonet
type SetConnState interface {
SetConnState(inactive bool)
}

// IsGonetDialer returns if the dialer is gonet dialer
type IsGonetDialer interface {
IsGonetDialer() bool
}
Loading