Skip to content
Open
21 changes: 19 additions & 2 deletions balancer/rls/control_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func (cc *controlChannel) monitorConnectivityState() {
cc.connectivityStateCh.Load()
cc.logger.Infof("Connectivity state is READY")

// Track whether we've seen TRANSIENT_FAILURE since the last READY state.
// We only want to reset backoff when recovering from an actual failure,
// not when transitioning through benign states like IDLE.
seenTransientFailure := false

for {
s, ok := <-cc.connectivityStateCh.Get()
if !ok {
Expand All @@ -197,9 +202,21 @@ func (cc *controlChannel) monitorConnectivityState() {
if s == connectivity.Shutdown {
return
}

// Track if we've entered TRANSIENT_FAILURE state
if s == connectivity.TransientFailure {
seenTransientFailure = true
}

// Only reset backoff if we're returning to READY after a failure
if s == connectivity.Ready {
cc.logger.Infof("Control channel back to READY")
cc.backToReadyFunc()
if seenTransientFailure {
cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE")
cc.backToReadyFunc()
seenTransientFailure = false
} else {
cc.logger.Infof("Control channel back to READY (no prior failure)")
}
}

cc.logger.Infof("Connectivity state is %s", s)
Expand Down
93 changes: 93 additions & 0 deletions balancer/rls/control_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import (
"fmt"
"os"
"regexp"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
Expand Down Expand Up @@ -463,3 +465,94 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) {
t.Fatal("newControlChannel succeeded when expected to fail")
}
}

// TestControlChannelConnectivityStateTransitions verifies that the control
// channel only resets backoff when recovering from TRANSIENT_FAILURE, not
// when going through benign state changes like READY → IDLE → READY.
func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) {
tests := []struct {
name string
states []connectivity.State
wantCallbackCount int
}{
{
name: "READY → TRANSIENT_FAILURE → READY triggers callback",
states: []connectivity.State{
connectivity.TransientFailure,
connectivity.Ready,
},
wantCallbackCount: 1,
},
{
name: "READY → IDLE → READY does not trigger callback",
states: []connectivity.State{
connectivity.Idle,
connectivity.Ready,
},
wantCallbackCount: 0,
},
{
name: "Multiple failures trigger callback each time",
states: []connectivity.State{
connectivity.TransientFailure,
connectivity.Ready,
connectivity.TransientFailure,
connectivity.Ready,
},
wantCallbackCount: 2,
},
{
name: "IDLE between failures doesn't affect callback",
states: []connectivity.State{
connectivity.TransientFailure,
connectivity.Idle,
connectivity.Ready,
},
wantCallbackCount: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be some way to improve the test. Maybe we can use waitGroups , but I will defer to @easwars for his opinion on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to test this in an e2e style, we would need to do make it possible for the test to see the connectivity state changes on the control channel, but without adding hooks into the test code. I propose the following:

  • Add a new variable that can be overridden by the test. This variable will point to a function that returns the subscriber to the passed to the SubscribeToConnectivityStateChanges API
var newConnectivityStateSubscriber = connStateSubscriber
  • Set the above variable to the following piece of code in control_channel.go
func connStateSubscriber(sub grpcsync.Subscriber) grpcsync.Subscriber {
       return sub
}
  • When calling the SubscribeToConnectivityStateChanges API, use the above function as follows:
       ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, newConnectivityStateSubscriber(ctrlCh))
  • In the test, create an implementation of the grpcsync.Subscriber interface that contains a delegate. This makes it possible for the test to delegate to the subscriber set by the non-test code, but make the connectivity state changes available to the test:
type wrappingConnectivityStateSubscriber struct {
       delegate    grpcsync.Subscriber
       connStateCh chan connectivity.State
}

func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) {
       w.delegate.OnMessage(msg)
       w.connStateCh <- msg.(connectivity.State)
}
  • Modify TestControlChannelConnectivityStateMonitoring or add a new test where we do the override. You would probably want to use a `testutils.
       // Override the connectivity state subscriber.
       wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 1)}
       origConnectivityStateSubscriber := newConnectivityStateSubscriber
       newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber {
               wrappedSubscriber.delegate = delegate
               return wrappedSubscriber
       }
       defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }()
  • In the test body, we need to check the appropriate state transitions in a bunch of places:
	// Make sure an RLS request is sent out.
	verifyRLSRequest(t, rlsReqCh, true)

	// Verify that the control channel moves to READY.
	wantStates := []connectivity.State{
		connectivity.Connecting,
		connectivity.Ready,
	}
	for _, wantState := range wantStates {
		select {
		case gotState := <-wrappedSubscriber.connStateCh:
			if gotState != wantState {
				t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState)
			}
		case <-ctx.Done():
			t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState)
		}
	}

	// Stop the RLS server.
	lis.Stop()

	// Verify that the control channel moves to IDLE.
	wantStates = []connectivity.State{
		connectivity.Idle,
	}
	for _, wantState := range wantStates {
		select {
		case gotState := <-wrappedSubscriber.connStateCh:
			if gotState != wantState {
				t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState)
			}
		case <-ctx.Done():
			t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState)
		}
	}

	// Make another RPC similar to the first one. Since the above cache entry
	// would have expired by now, this should trigger another RLS request. And
	// since the RLS server is down, RLS request will fail and the cache entry
	// will enter backoff, and we have overridden the default backoff strategy to
	// return a value which will keep this entry in backoff for the whole duration
	// of the test.
	makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)

	// Verify that the control channel moves to TRANSIENT_FAILURE.
	wantStates = []connectivity.State{
		connectivity.Connecting,
		connectivity.TransientFailure,
	}
	for _, wantState := range wantStates {
		select {
		case gotState := <-wrappedSubscriber.connStateCh:
			if gotState != wantState {
				t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState)
			}
		case <-ctx.Done():
			t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState)
		}
	}

	// Restart the RLS server.
	lis.Restart()

The above will test the READY --> TF --> READY transition.

For the READY --> IDLE --> READY, we need to restart the RLS server once the control channel goes IDLE, and then wait for it to go READY before attempting another RPC and verifying that backoffs are not reset.

Let me know what you think about this approach.

Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this option not work out? I still see two new tests being added instead of modifying the existing one TestControlChannelConnectivityStateMonitoring which is more e2e style than the ones currently being added in this PR. Please let me know if you are running into any issues when trying to modify the existing test as per my suggestion. Thanks.

// Start an RLS server
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)

// Setup callback to count invocations
callbackCount := 0
var mu sync.Mutex
callback := func() {
mu.Lock()
callbackCount++
mu.Unlock()
}

// Create control channel
ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback)
if err != nil {
t.Fatalf("Failed to create control channel: %v", err)
}
defer ctrlCh.close()

// Give the channel time to reach initial READY state
time.Sleep(100 * time.Millisecond)

// Inject the test state sequence
for _, state := range tt.states {
ctrlCh.OnMessage(state)
// Give time for the monitoring goroutine to process the state
time.Sleep(50 * time.Millisecond)
}

// Give extra time for any pending callbacks
time.Sleep(100 * time.Millisecond)

mu.Lock()
gotCallbackCount := callbackCount
mu.Unlock()

if gotCallbackCount != tt.wantCallbackCount {
t.Errorf("Got %d callback invocations, want %d", gotCallbackCount, tt.wantCallbackCount)
}
})
}
}
Loading