Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,11 +1482,33 @@ func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, c
// new transport.
func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
addr.ServerName = ac.cc.getServerName(addr)

var healthCheckDoneCh <-chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having two channels (where one is a copy of another, and we depend on nil checks to figure out if the associated event is something that will happen and therefore we should wait for it), what do you think about the following approach, which uses the grpcsync.Event type defined here:

type Event struct {

The grpcsync.Event is simply a wrapper around a channel and a boolean. So, you can check if the event has fired with one of its methods and you can wait for an event to fire with another of its methods.

  • Make startHealthCheck returns two grpcsync.Events
    • One for the health check goroutine started event
      • startHealthCheck will fire this event right before starting the health check goroutine
    • Another for the health check goroutine completed event
      • startHealthCheck will fire this event as a deferred statement from inside the health check goroutine
  • In createTransport
    • We can check in a defer that if the health check goroutine was started using the first of the above two events, and if the health check goroutine was not stated at all, then we can cancel the hctx
    • In onClose, we will wait for the health check goroutine completed event if and only if the health check goroutine was started.

Let me know what you think about this approach. Thanks.

hctx, hcancel := context.WithCancel(ctx)
defer func() {
// If healthCheckDoneCh is nil, then a health check has not been
// started. Therefore, the health check context can be canceled because
// it is not in use.
if healthCheckDoneCh == nil {
hcancel()
}
}()

onClose := func(r transport.GoAwayReason) {
var healthCheckCompleteCh <-chan struct{}

ac.mu.Lock()
defer ac.mu.Unlock()
defer func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a small note stating where the healthCheckComplete is set and why do we wait on healthCheckComplete instead of healthCheckDone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments. Hope they're helpful.

ac.mu.Unlock()
// If healthCheckCompleteCh is not nil, then hcancel() has been
// called and healthCheckCompleteCh is a copy of healthCheckDoneCh,
// as it was when ac.mu was held. Now wait for the health check to
// complete.
if healthCheckCompleteCh != nil {
<-healthCheckCompleteCh
}
}()

// adjust params based on GoAwayReason
ac.adjustParams(r)
if ctx.Err() != nil {
Expand All @@ -1497,6 +1519,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
return
}
hcancel()
healthCheckCompleteCh = healthCheckDoneCh
if ac.transport == nil {
// We're still connecting to this address, which could error. Do
// not update the connectivity state or resolve; these will happen
Expand All @@ -1522,7 +1545,6 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
logger.Infof("Creating new client transport to %q: %v", addr, err)
}
// newTr is either nil, or closed.
hcancel()
channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
return err
}
Expand Down Expand Up @@ -1556,13 +1578,17 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
}
ac.curAddr = addr
ac.transport = newTr
ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
healthCheckDoneCh = ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
return nil
}

// startHealthCheck starts the health checking stream (RPC) to watch the health
// stats of this connection if health checking is requested and configured.
//
// A channel is returned that will be closed once the health check goroutine
// exits after ctx has been canceled, or nil if the health check requirements
// aren't met and no goroutine has been started.
//
// LB channel health checking is enabled when all requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
// 2. internal.HealthCheckFunc is set by importing the grpc/health package
Expand All @@ -1572,7 +1598,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,
// It sets addrConn to READY if the health checking stream is not started.
//
// Caller must hold ac.mu.
func (ac *addrConn) startHealthCheck(ctx context.Context) {
func (ac *addrConn) startHealthCheck(ctx context.Context) <-chan struct{} {
var healthcheckManagingState bool
defer func() {
if !healthcheckManagingState {
Expand All @@ -1581,22 +1607,22 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
}()

if ac.cc.dopts.disableHealthCheck {
return
return nil
}
healthCheckConfig := ac.cc.healthCheckConfig()
if healthCheckConfig == nil {
return
return nil
}
if !ac.scopts.HealthCheckEnabled {
return
return nil
}
healthCheckFunc := internal.HealthCheckFunc
if healthCheckFunc == nil {
// The health package is not imported to set health check function.
//
// TODO: add a link to the health check doc in the error message.
channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.")
return
return nil
}

healthcheckManagingState = true
Expand All @@ -1621,7 +1647,9 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
ac.updateConnectivityState(s, lastErr)
}
// Start the health checking stream.
done := make(chan struct{})
go func() {
defer close(done)
err := healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
Expand All @@ -1631,6 +1659,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
}
}
}()
return done
}

func (ac *addrConn) resetConnectBackoff() {
Expand Down