diff --git a/client/v3/ctx.go b/client/v3/ctx.go index 38cee6c27e4a..6ae1947aa0f9 100644 --- a/client/v3/ctx.go +++ b/client/v3/ctx.go @@ -25,6 +25,8 @@ import ( // WithRequireLeader requires client requests to only succeed // when the cluster has a leader. +// The leader check is performed by interceptors created by +// newUnaryInterceptor and newStreamInterceptor. func WithRequireLeader(ctx context.Context) context.Context { md, ok := metadata.FromOutgoingContext(ctx) if !ok { // no outgoing metadata ctx key, create one diff --git a/client/v3/lease.go b/client/v3/lease.go index ab5c559a5487..8f20b5660e43 100644 --- a/client/v3/lease.go +++ b/client/v3/lease.go @@ -376,7 +376,7 @@ func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-cha func (l *lessor) closeRequireLeader() { l.mu.Lock() defer l.mu.Unlock() - for _, ka := range l.keepAlives { + for leaseID, ka := range l.keepAlives { reqIdxs := 0 // find all required leader channels, close, mark as nil for i, ctx := range ka.ctxs { @@ -395,6 +395,14 @@ func (l *lessor) closeRequireLeader() { if reqIdxs == 0 { continue } + // All ctxs for this lease ID explicitly require leader and now there's + // no leader, so we should remove this lease ID, otherwise we would waste + // bandwidth sending keepAlive request for this lease ID and wouldn't have + // any channel to receive the responses anyway. + if reqIdxs == len(ka.chs) { + delete(l.keepAlives, leaseID) + continue + } // remove all channels that required a leader from keepalive newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) newCtxs := make([]context.Context, len(newChs)) @@ -403,7 +411,7 @@ func (l *lessor) closeRequireLeader() { if ka.chs[i] == nil { continue } - newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx] + newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[i] newIdx++ } ka.chs, ka.ctxs = newChs, newCtxs diff --git a/tests/integration/clientv3/lease/lease_test.go b/tests/integration/clientv3/lease/lease_test.go index 9ee6694018e7..8ca60e71036b 100644 --- a/tests/integration/clientv3/lease/lease_test.go +++ b/tests/integration/clientv3/lease/lease_test.go @@ -664,6 +664,123 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) { require.ErrorAsf(t, err, &keepAliveHaltedErr, "expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err) } +// waitForLeaseKeepAliveChannelClose drains any pending responses and waits for channel to close. +func waitForLeaseKeepAliveChannelClose(t *testing.T, ch <-chan *clientv3.LeaseKeepAliveResponse) { + t.Helper() + for { + select { + case _, ok := <-ch: + if !ok { + return + } + case <-time.After(5 * time.Second): + t.Fatal("keepAlive with require leader took too long to close") + } + } +} + +// TestLeaseWithRequireLeader checks keep-alive channel close when no leader, +// and verifies lease expiration behavior after leader is restored. +func TestLeaseWithRequireLeader(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) + defer clus.Terminate(t) + + c := clus.Client(0) + leaseTTL := 9 * time.Second + + // Create lease with two keepAlives, both with WithRequireLeader. + // When leader is lost, both will be removed from keepAlives map, lease will expire + t.Run("lease expires when all keepAlives require leader", func(t *testing.T) { + leaseRsp, err := c.Grant(t.Context(), int64(leaseTTL.Seconds())) + require.NoError(t, err) + + ka1, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(t.Context()), leaseRsp.ID) + require.NoError(t, kerr1) + ka2, kerr2 := c.KeepAlive(clientv3.WithRequireLeader(t.Context()), leaseRsp.ID) + require.NoError(t, kerr2) + + // Wait for first keepAlive responses + for _, ch := range []<-chan *clientv3.LeaseKeepAliveResponse{ka1, ka2} { + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatal("first keep-alive timed out") + } + } + + clus.Members[1].Stop(t) + + waitForLeaseKeepAliveChannelClose(t, ka1) + waitForLeaseKeepAliveChannelClose(t, ka2) + + require.NoError(t, clus.Members[1].Restart(t)) + clus.WaitLeader(t) + + // Add 1 second buffer to ensure lease is fully expired + time.Sleep(leaseTTL + time.Second) + + ttl, err := c.TimeToLive(t.Context(), leaseRsp.ID) + require.NoError(t, err) + require.Equalf(t, int64(-1), ttl.TTL, + "lease should have expired (TTL=-1), got TTL=%d", ttl.TTL) + }) + + // Create lease with two keepAlives: one with WithRequireLeader, one without + // When leader is lost, only the WithRequireLeader one is removed, lease stays alive + t.Run("lease stays alive with mixed keepAlives", func(t *testing.T) { + leaseRsp, err := c.Grant(t.Context(), int64(leaseTTL.Seconds())) + require.NoError(t, err) + + kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(t.Context()), leaseRsp.ID) + require.NoError(t, kerr1) + kaNoLeader, kerr2 := c.KeepAlive(t.Context(), leaseRsp.ID) + require.NoError(t, kerr2) + + // Wait for first keepAlive responses + for _, ch := range []<-chan *clientv3.LeaseKeepAliveResponse{kaReqLeader, kaNoLeader} { + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatal("first keep-alive timed out") + } + } + + clus.Members[1].Stop(t) + waitForLeaseKeepAliveChannelClose(t, kaReqLeader) + + // This ensures we don't mistake old responses as fresh ones after restore + drainLoop: + for { + select { + case _, ok := <-kaNoLeader: + if !ok { + t.Fatal("kaNoLeader channel unexpectedly closed while draining") + } + default: + break drainLoop + } + } + + require.NoError(t, clus.Members[1].Restart(t)) + clus.WaitLeader(t) + + select { + case resp, ok := <-kaNoLeader: + require.Truef(t, ok, "keepAlive channel should still be open") + require.NotNilf(t, resp, "expected keepAlive response after leader restore") + case <-time.After(5 * time.Second): + t.Fatal("keepAlive response timed out after leader restored") + } + + ttl, err := c.TimeToLive(t.Context(), leaseRsp.ID) + require.NoError(t, err) + require.Positivef(t, ttl.TTL, + "lease should still be alive (TTL>0), got TTL=%d", ttl.TTL) + }) +} + // TestV3LeaseFailureOverlap issues Grant and KeepAlive requests to a cluster // before, during, and after quorum loss to confirm Grant/KeepAlive tolerates // transient cluster failure. @@ -718,58 +835,3 @@ func TestV3LeaseFailureOverlap(t *testing.T) { mkReqs(4) wg.Wait() } - -// TestLeaseWithRequireLeader checks keep-alive channel close when no leader. -func TestLeaseWithRequireLeader(t *testing.T) { - integration.BeforeTest(t) - - clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) - defer clus.Terminate(t) - - c := clus.Client(0) - lid1, err1 := c.Grant(t.Context(), 60) - require.NoError(t, err1) - lid2, err2 := c.Grant(t.Context(), 60) - require.NoError(t, err2) - // kaReqLeader close if the leader is lost - kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(t.Context()), lid1.ID) - require.NoError(t, kerr1) - // kaWait will wait even if the leader is lost - kaWait, kerr2 := c.KeepAlive(t.Context(), lid2.ID) - require.NoError(t, kerr2) - - select { - case <-kaReqLeader: - case <-time.After(5 * time.Second): - t.Fatalf("require leader first keep-alive timed out") - } - select { - case <-kaWait: - case <-time.After(5 * time.Second): - t.Fatalf("leader not required first keep-alive timed out") - } - - clus.Members[1].Stop(t) - // kaReqLeader may issue multiple requests while waiting for the first - // response from proxy server; drain any stray keepalive responses - time.Sleep(100 * time.Millisecond) - for { - <-kaReqLeader - if len(kaReqLeader) == 0 { - break - } - } - - select { - case resp, ok := <-kaReqLeader: - require.Falsef(t, ok, "expected closed require leader, got response %+v", resp) - case <-time.After(5 * time.Second): - t.Fatal("keepalive with require leader took too long to close") - } - select { - case _, ok := <-kaWait: - require.Truef(t, ok, "got closed channel with no require leader, expected non-closed") - case <-time.After(10 * time.Millisecond): - // wait some to detect any closes happening soon after kaReqLeader closing - } -}