Skip to content
Open
132 changes: 57 additions & 75 deletions balancer/rls/control_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package rls
import (
"context"
"fmt"
"sync"
"time"

"google.golang.org/grpc"
Expand All @@ -29,7 +30,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/buffer"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
Expand All @@ -44,6 +44,16 @@ type adaptiveThrottler interface {
RegisterBackendResponse(throttled bool)
}

// newConnectivityStateSubscriber is a variable that can be overridden in tests
// to wrap the connectivity state subscriber for testing purposes.
var newConnectivityStateSubscriber = connStateSubscriber

// connStateSubscriber returns the subscriber as-is. This function can be
// overridden in tests to wrap the subscriber.
func connStateSubscriber(sub grpcsync.Subscriber) grpcsync.Subscriber {
return sub
}

// controlChannel is a wrapper around the gRPC channel to the RLS server
// specified in the service config.
type controlChannel struct {
Expand All @@ -57,24 +67,24 @@ type controlChannel struct {
// hammering the RLS service while it is overloaded or down.
throttler adaptiveThrottler

cc *grpc.ClientConn
client rlsgrpc.RouteLookupServiceClient
logger *internalgrpclog.PrefixLogger
connectivityStateCh *buffer.Unbounded
unsubscribe func()
monitorDoneCh chan struct{}
cc *grpc.ClientConn
client rlsgrpc.RouteLookupServiceClient
logger *internalgrpclog.PrefixLogger
unsubscribe func()

// All fields below are guarded by mu.
mu sync.Mutex
seenTransientFailure bool
}

// newControlChannel creates a controlChannel to rlsServerName and uses
// serviceConfig, if non-empty, as the default service config for the underlying
// gRPC channel.
func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
ctrlCh := &controlChannel{
rpcTimeout: rpcTimeout,
backToReadyFunc: backToReadyFunc,
throttler: newAdaptiveThrottler(),
connectivityStateCh: buffer.NewUnbounded(),
monitorDoneCh: make(chan struct{}),
rpcTimeout: rpcTimeout,
backToReadyFunc: backToReadyFunc,
throttler: newAdaptiveThrottler(),
}
ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))

Expand All @@ -88,11 +98,10 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura
}
// Subscribe to connectivity state before connecting to avoid missing initial
// updates, which are only delivered to active subscribers.
ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, ctrlCh)
ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, newConnectivityStateSubscriber(ctrlCh))
ctrlCh.cc.Connect()
ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)
go ctrlCh.monitorConnectivityState()
return ctrlCh, nil
}

Expand All @@ -101,7 +110,40 @@ func (cc *controlChannel) OnMessage(msg any) {
if !ok {
panic(fmt.Sprintf("Unexpected message type %T , wanted connectectivity.State type", msg))
}
cc.connectivityStateCh.Put(st)

cc.mu.Lock()
defer cc.mu.Unlock()

switch st {
case connectivity.Ready:
// Only reset backoff when transitioning from TRANSIENT_FAILURE to READY.
// This indicates the RLS server has recovered from being unreachable, so
// we reset backoff state in all cache entries to allow pending RPCs to
// proceed immediately. We skip benign transitions like READY → IDLE → READY
// since those don't represent actual failures.
if cc.seenTransientFailure {
if cc.logger.V(2) {
cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE")
}
cc.seenTransientFailure = false
if cc.backToReadyFunc != nil {
cc.backToReadyFunc()
}
} else {
if cc.logger.V(2) {
cc.logger.Infof("Control channel is READY")
}
}
case connectivity.TransientFailure:
// Track that we've entered TRANSIENT_FAILURE state so we know to reset
// backoffs when we recover to READY.
cc.logger.Warningf("Control channel is TRANSIENT_FAILURE")
cc.seenTransientFailure = true
default:
if cc.logger.V(2) {
cc.logger.Infof("Control channel connectivity state is %s", st)
}
}
}

// dialOpts constructs the dial options for the control plane channel.
Expand Down Expand Up @@ -148,68 +190,8 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st
return dopts, nil
}

func (cc *controlChannel) monitorConnectivityState() {
cc.logger.Infof("Starting connectivity state monitoring goroutine")
defer close(cc.monitorDoneCh)

// Since we use two mechanisms to deal with RLS server being down:
// - adaptive throttling for the channel as a whole
// - exponential backoff on a per-request basis
// we need a way to avoid double-penalizing requests by counting failures
// toward both mechanisms when the RLS server is unreachable.
//
// To accomplish this, we monitor the state of the control plane channel. If
// the state has been TRANSIENT_FAILURE since the last time it was in state
// READY, and it then transitions into state READY, we push on a channel
// which is being read by the LB policy.
//
// The LB the policy will iterate through the cache to reset the backoff
// timeouts in all cache entries. Specifically, this means that it will
// reset the backoff state and cancel the pending backoff timer. Note that
// when cancelling the backoff timer, just like when the backoff timer fires
// normally, a new picker is returned to the channel, to force it to
// re-process any wait-for-ready RPCs that may still be queued if we failed
// them while we were in backoff. However, we should optimize this case by
// returning only one new picker, regardless of how many backoff timers are
// cancelled.

// Wait for the control channel to become READY for the first time.
for s, ok := <-cc.connectivityStateCh.Get(); s != connectivity.Ready; s, ok = <-cc.connectivityStateCh.Get() {
if !ok {
return
}

cc.connectivityStateCh.Load()
if s == connectivity.Shutdown {
return
}
}
cc.connectivityStateCh.Load()
cc.logger.Infof("Connectivity state is READY")

for {
s, ok := <-cc.connectivityStateCh.Get()
if !ok {
return
}
cc.connectivityStateCh.Load()

if s == connectivity.Shutdown {
return
}
if s == connectivity.Ready {
cc.logger.Infof("Control channel back to READY")
cc.backToReadyFunc()
}

cc.logger.Infof("Connectivity state is %s", s)
}
}

func (cc *controlChannel) close() {
cc.unsubscribe()
cc.connectivityStateCh.Close()
<-cc.monitorDoneCh
cc.cc.Close()
cc.logger.Infof("Shutdown")
}
Expand Down
Loading
Loading