Skip to content

Commit 317271b

Browse files
authored
pickfirst: Register a health listener when used as a leaf policy (#7832)
1 parent 5565631 commit 317271b

File tree

2 files changed

+384
-37
lines changed

2 files changed

+384
-37
lines changed

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

+136-37
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ func init() {
5454
balancer.Register(pickfirstBuilder{})
5555
}
5656

57+
// enableHealthListenerKeyType is a unique key type used in resolver attributes
58+
// to indicate whether the health listener usage is enabled.
59+
type enableHealthListenerKeyType struct{}
60+
5761
var (
5862
logger = grpclog.Component("pick-first-leaf-lb")
5963
// Name is the name of the pick_first_leaf balancer.
@@ -109,10 +113,8 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions)
109113
target: bo.Target.String(),
110114
metricsRecorder: bo.MetricsRecorder, // ClientConn will always create a Metrics Recorder.
111115

112-
addressList: addressList{},
113116
subConns: resolver.NewAddressMap(),
114117
state: connectivity.Connecting,
115-
mu: sync.Mutex{},
116118
cancelConnectionTimer: func() {},
117119
}
118120
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
@@ -131,6 +133,13 @@ func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalan
131133
return cfg, nil
132134
}
133135

136+
// EnableHealthListener updates the state to configure pickfirst for using a
137+
// generic health listener.
138+
func EnableHealthListener(state resolver.State) resolver.State {
139+
state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
140+
return state
141+
}
142+
134143
type pfConfig struct {
135144
serviceconfig.LoadBalancingConfig `json:"-"`
136145

@@ -148,15 +157,19 @@ type scData struct {
148157
subConn balancer.SubConn
149158
addr resolver.Address
150159

151-
state connectivity.State
160+
rawConnectivityState connectivity.State
161+
// The effective connectivity state based on raw connectivity, health state
162+
// and after following sticky TransientFailure behaviour defined in A62.
163+
effectiveState connectivity.State
152164
lastErr error
153165
connectionFailedInFirstPass bool
154166
}
155167

156168
func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
157169
sd := &scData{
158-
state: connectivity.Idle,
159-
addr: addr,
170+
rawConnectivityState: connectivity.Idle,
171+
effectiveState: connectivity.Idle,
172+
addr: addr,
160173
}
161174
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
162175
StateListener: func(state balancer.SubConnState) {
@@ -181,14 +194,17 @@ type pickfirstBalancer struct {
181194
// The mutex is used to ensure synchronization of updates triggered
182195
// from the idle picker and the already serialized resolver,
183196
// SubConn state updates.
184-
mu sync.Mutex
197+
mu sync.Mutex
198+
// State reported to the channel based on SubConn states and resolver
199+
// updates.
185200
state connectivity.State
186201
// scData for active subonns mapped by address.
187202
subConns *resolver.AddressMap
188203
addressList addressList
189204
firstPass bool
190205
numTF int
191206
cancelConnectionTimer func()
207+
healthCheckingEnabled bool
192208
}
193209

194210
// ResolverError is called by the ClientConn when the name resolver produces
@@ -214,7 +230,7 @@ func (b *pickfirstBalancer) resolverErrorLocked(err error) {
214230
return
215231
}
216232

217-
b.cc.UpdateState(balancer.State{
233+
b.updateBalancerState(balancer.State{
218234
ConnectivityState: connectivity.TransientFailure,
219235
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
220236
})
@@ -227,12 +243,12 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
227243
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
228244
// Cleanup state pertaining to the previous resolver state.
229245
// Treat an empty address list like an error by calling b.ResolverError.
230-
b.state = connectivity.TransientFailure
231246
b.closeSubConnsLocked()
232247
b.addressList.updateAddrs(nil)
233248
b.resolverErrorLocked(errors.New("produced zero addresses"))
234249
return balancer.ErrBadResolverState
235250
}
251+
b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
236252
cfg, ok := state.BalancerConfig.(pfConfig)
237253
if state.BalancerConfig != nil && !ok {
238254
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
@@ -279,12 +295,15 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
279295
newAddrs = deDupAddresses(newAddrs)
280296
newAddrs = interleaveAddresses(newAddrs)
281297

282-
// If the previous ready SubConn exists in new address list,
283-
// keep this connection and don't create new SubConns.
284298
prevAddr := b.addressList.currentAddress()
299+
prevSCData, found := b.subConns.Get(prevAddr)
285300
prevAddrsCount := b.addressList.size()
301+
isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready
286302
b.addressList.updateAddrs(newAddrs)
287-
if b.state == connectivity.Ready && b.addressList.seekTo(prevAddr) {
303+
304+
// If the previous ready SubConn exists in new address list,
305+
// keep this connection and don't create new SubConns.
306+
if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
288307
return nil
289308
}
290309

@@ -296,10 +315,9 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
296315
// we should still enter CONNECTING because the sticky TF behaviour
297316
// mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
298317
// due to connectivity failures.
299-
if b.state == connectivity.Ready || b.state == connectivity.Connecting || prevAddrsCount == 0 {
318+
if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
300319
// Start connection attempt at first address.
301-
b.state = connectivity.Connecting
302-
b.cc.UpdateState(balancer.State{
320+
b.forceUpdateConcludedStateLocked(balancer.State{
303321
ConnectivityState: connectivity.Connecting,
304322
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
305323
})
@@ -501,7 +519,7 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
501519
}
502520

503521
scd := sd.(*scData)
504-
switch scd.state {
522+
switch scd.rawConnectivityState {
505523
case connectivity.Idle:
506524
scd.subConn.Connect()
507525
b.scheduleNextConnectionLocked()
@@ -519,7 +537,7 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
519537
b.scheduleNextConnectionLocked()
520538
return
521539
default:
522-
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.state)
540+
b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState)
523541
return
524542

525543
}
@@ -562,16 +580,17 @@ func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
562580
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
563581
b.mu.Lock()
564582
defer b.mu.Unlock()
565-
oldState := sd.state
566-
sd.state = newState.ConnectivityState
583+
oldState := sd.rawConnectivityState
584+
sd.rawConnectivityState = newState.ConnectivityState
567585
// Previously relevant SubConns can still callback with state updates.
568586
// To prevent pickers from returning these obsolete SubConns, this logic
569587
// is included to check if the current list of active SubConns includes this
570588
// SubConn.
571-
if activeSD, found := b.subConns.Get(sd.addr); !found || activeSD != sd {
589+
if !b.isActiveSCData(sd) {
572590
return
573591
}
574592
if newState.ConnectivityState == connectivity.Shutdown {
593+
sd.effectiveState = connectivity.Shutdown
575594
return
576595
}
577596

@@ -590,10 +609,30 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
590609
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
591610
return
592611
}
593-
b.state = connectivity.Ready
594-
b.cc.UpdateState(balancer.State{
595-
ConnectivityState: connectivity.Ready,
596-
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
612+
if !b.healthCheckingEnabled {
613+
if b.logger.V(2) {
614+
b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
615+
}
616+
617+
sd.effectiveState = connectivity.Ready
618+
b.updateBalancerState(balancer.State{
619+
ConnectivityState: connectivity.Ready,
620+
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
621+
})
622+
return
623+
}
624+
if b.logger.V(2) {
625+
b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
626+
}
627+
// Send a CONNECTING update to take the SubConn out of sticky-TF if
628+
// required.
629+
sd.effectiveState = connectivity.Connecting
630+
b.updateBalancerState(balancer.State{
631+
ConnectivityState: connectivity.Connecting,
632+
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
633+
})
634+
sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
635+
b.updateSubConnHealthState(sd, scs)
597636
})
598637
return
599638
}
@@ -604,11 +643,13 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
604643
// a transport is successfully created, but the connection fails
605644
// before the SubConn can send the notification for READY. We treat
606645
// this as a successful connection and transition to IDLE.
607-
if (b.state == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
646+
// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
647+
// part of the if condition below once the issue is fixed.
648+
if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
608649
// Once a transport fails, the balancer enters IDLE and starts from
609650
// the first address when the picker is used.
610651
b.shutdownRemainingLocked(sd)
611-
b.state = connectivity.Idle
652+
sd.effectiveState = newState.ConnectivityState
612653
// READY SubConn interspliced in between CONNECTING and IDLE, need to
613654
// account for that.
614655
if oldState == connectivity.Connecting {
@@ -619,7 +660,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
619660
}
620661
disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
621662
b.addressList.reset()
622-
b.cc.UpdateState(balancer.State{
663+
b.updateBalancerState(balancer.State{
623664
ConnectivityState: connectivity.Idle,
624665
Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
625666
})
@@ -629,19 +670,19 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
629670
if b.firstPass {
630671
switch newState.ConnectivityState {
631672
case connectivity.Connecting:
632-
// The balancer can be in either IDLE, CONNECTING or
633-
// TRANSIENT_FAILURE. If it's in TRANSIENT_FAILURE, stay in
673+
// The effective state can be in either IDLE, CONNECTING or
674+
// TRANSIENT_FAILURE. If it's TRANSIENT_FAILURE, stay in
634675
// TRANSIENT_FAILURE until it's READY. See A62.
635-
// If the balancer is already in CONNECTING, no update is needed.
636-
if b.state == connectivity.Idle {
637-
b.state = connectivity.Connecting
638-
b.cc.UpdateState(balancer.State{
676+
if sd.effectiveState != connectivity.TransientFailure {
677+
sd.effectiveState = connectivity.Connecting
678+
b.updateBalancerState(balancer.State{
639679
ConnectivityState: connectivity.Connecting,
640680
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
641681
})
642682
}
643683
case connectivity.TransientFailure:
644684
sd.lastErr = newState.ConnectionError
685+
sd.effectiveState = connectivity.TransientFailure
645686
// Since we're re-using common SubConns while handling resolver
646687
// updates, we could receive an out of turn TRANSIENT_FAILURE from
647688
// a pass over the previous address list. Happy Eyeballs will also
@@ -668,7 +709,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
668709
b.numTF = (b.numTF + 1) % b.subConns.Len()
669710
sd.lastErr = newState.ConnectionError
670711
if b.numTF%b.subConns.Len() == 0 {
671-
b.cc.UpdateState(balancer.State{
712+
b.updateBalancerState(balancer.State{
672713
ConnectivityState: connectivity.TransientFailure,
673714
Picker: &picker{err: newState.ConnectionError},
674715
})
@@ -698,21 +739,79 @@ func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
698739
}
699740
}
700741
b.firstPass = false
701-
b.state = connectivity.TransientFailure
702-
703-
b.cc.UpdateState(balancer.State{
742+
b.updateBalancerState(balancer.State{
704743
ConnectivityState: connectivity.TransientFailure,
705744
Picker: &picker{err: lastErr},
706745
})
707746
// Start re-connecting all the SubConns that are already in IDLE.
708747
for _, v := range b.subConns.Values() {
709748
sd := v.(*scData)
710-
if sd.state == connectivity.Idle {
749+
if sd.rawConnectivityState == connectivity.Idle {
711750
sd.subConn.Connect()
712751
}
713752
}
714753
}
715754

755+
func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {
756+
activeSD, found := b.subConns.Get(sd.addr)
757+
return found && activeSD == sd
758+
}
759+
760+
func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
761+
b.mu.Lock()
762+
defer b.mu.Unlock()
763+
// Previously relevant SubConns can still callback with state updates.
764+
// To prevent pickers from returning these obsolete SubConns, this logic
765+
// is included to check if the current list of active SubConns includes
766+
// this SubConn.
767+
if !b.isActiveSCData(sd) {
768+
return
769+
}
770+
sd.effectiveState = state.ConnectivityState
771+
switch state.ConnectivityState {
772+
case connectivity.Ready:
773+
b.updateBalancerState(balancer.State{
774+
ConnectivityState: connectivity.Ready,
775+
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
776+
})
777+
case connectivity.TransientFailure:
778+
b.updateBalancerState(balancer.State{
779+
ConnectivityState: connectivity.TransientFailure,
780+
Picker: &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
781+
})
782+
case connectivity.Connecting:
783+
b.updateBalancerState(balancer.State{
784+
ConnectivityState: connectivity.Connecting,
785+
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
786+
})
787+
default:
788+
b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)
789+
}
790+
}
791+
792+
// updateBalancerState stores the state reported to the channel and calls
793+
// ClientConn.UpdateState(). As an optimization, it avoids sending duplicate
794+
// updates to the channel.
795+
func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
796+
// In case of TransientFailures allow the picker to be updated to update
797+
// the connectivity error, in all other cases don't send duplicate state
798+
// updates.
799+
if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
800+
return
801+
}
802+
b.forceUpdateConcludedStateLocked(newState)
803+
}
804+
805+
// forceUpdateConcludedStateLocked stores the state reported to the channel and
806+
// calls ClientConn.UpdateState().
807+
// A separate function is defined to force update the ClientConn state since the
808+
// channel doesn't correctly assume that LB policies start in CONNECTING and
809+
// relies on LB policy to send an initial CONNECTING update.
810+
func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
811+
b.state = newState.ConnectivityState
812+
b.cc.UpdateState(newState)
813+
}
814+
716815
type picker struct {
717816
result balancer.PickResult
718817
err error

0 commit comments

Comments
 (0)