Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/v3/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

// WithRequireLeader requires client requests to only succeed
// when the cluster has a leader.
// The leader check is performed by interceptors created by
// newUnaryInterceptor and newStreamInterceptor.
func WithRequireLeader(ctx context.Context) context.Context {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok { // no outgoing metadata ctx key, create one
Expand Down
12 changes: 10 additions & 2 deletions client/v3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-cha
func (l *lessor) closeRequireLeader() {
l.mu.Lock()
defer l.mu.Unlock()
for _, ka := range l.keepAlives {
for leaseID, ka := range l.keepAlives {
reqIdxs := 0
// find all required leader channels, close, mark as nil
for i, ctx := range ka.ctxs {
Expand All @@ -395,6 +395,14 @@ func (l *lessor) closeRequireLeader() {
if reqIdxs == 0 {
continue
}
// All ctxs for this lease ID explicitly require leader and now there's
// no leader, so we should remove this lease ID, otherwise we would waste
// bandwidth sending keepAlive request for this lease ID and wouldn't have
// any channel to receive the responses anyway.
if reqIdxs == len(ka.chs) {
delete(l.keepAlives, leaseID)
continue
}
// remove all channels that required a leader from keepalive
newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
newCtxs := make([]context.Context, len(newChs))
Expand All @@ -403,7 +411,7 @@ func (l *lessor) closeRequireLeader() {
if ka.chs[i] == nil {
continue
}
newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[i]
newIdx++
}
ka.chs, ka.ctxs = newChs, newCtxs
Expand Down
172 changes: 117 additions & 55 deletions tests/integration/clientv3/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,123 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
require.ErrorAsf(t, err, &keepAliveHaltedErr, "expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
}

// waitForLeaseKeepAliveChannelClose drains any pending responses and waits for channel to close.
func waitForLeaseKeepAliveChannelClose(t *testing.T, ch <-chan *clientv3.LeaseKeepAliveResponse) {
t.Helper()
for {
select {
case _, ok := <-ch:
if !ok {
return
}
case <-time.After(5 * time.Second):
t.Fatal("keepAlive with require leader took too long to close")
}
}
}

// TestLeaseWithRequireLeader checks keep-alive channel close when no leader,
// and verifies lease expiration behavior after leader is restored.
func TestLeaseWithRequireLeader(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)

c := clus.Client(0)
leaseTTL := 9 * time.Second

// Create lease with two keepAlives, both with WithRequireLeader.
// When leader is lost, both will be removed from keepAlives map, lease will expire
t.Run("lease expires when all keepAlives require leader", func(t *testing.T) {
leaseRsp, err := c.Grant(t.Context(), int64(leaseTTL.Seconds()))
require.NoError(t, err)

ka1, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(t.Context()), leaseRsp.ID)
require.NoError(t, kerr1)
ka2, kerr2 := c.KeepAlive(clientv3.WithRequireLeader(t.Context()), leaseRsp.ID)
require.NoError(t, kerr2)

// Wait for first keepAlive responses
for _, ch := range []<-chan *clientv3.LeaseKeepAliveResponse{ka1, ka2} {
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatal("first keep-alive timed out")
}
}

clus.Members[1].Stop(t)

waitForLeaseKeepAliveChannelClose(t, ka1)
waitForLeaseKeepAliveChannelClose(t, ka2)

require.NoError(t, clus.Members[1].Restart(t))
clus.WaitLeader(t)

// Add 1 second buffer to ensure lease is fully expired
time.Sleep(leaseTTL + time.Second)

ttl, err := c.TimeToLive(t.Context(), leaseRsp.ID)
require.NoError(t, err)
require.Equalf(t, int64(-1), ttl.TTL,
"lease should have expired (TTL=-1), got TTL=%d", ttl.TTL)
})

// Create lease with two keepAlives: one with WithRequireLeader, one without
// When leader is lost, only the WithRequireLeader one is removed, lease stays alive
t.Run("lease stays alive with mixed keepAlives", func(t *testing.T) {
leaseRsp, err := c.Grant(t.Context(), int64(leaseTTL.Seconds()))
require.NoError(t, err)

kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(t.Context()), leaseRsp.ID)
require.NoError(t, kerr1)
kaNoLeader, kerr2 := c.KeepAlive(t.Context(), leaseRsp.ID)
require.NoError(t, kerr2)

// Wait for first keepAlive responses
for _, ch := range []<-chan *clientv3.LeaseKeepAliveResponse{kaReqLeader, kaNoLeader} {
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatal("first keep-alive timed out")
}
}

clus.Members[1].Stop(t)
waitForLeaseKeepAliveChannelClose(t, kaReqLeader)

// This ensures we don't mistake old responses as fresh ones after restore
drainLoop:
for {
select {
case _, ok := <-kaNoLeader:
if !ok {
t.Fatal("kaNoLeader channel unexpectedly closed while draining")
}
default:
break drainLoop
}
}

require.NoError(t, clus.Members[1].Restart(t))
clus.WaitLeader(t)

select {
case resp, ok := <-kaNoLeader:
require.Truef(t, ok, "keepAlive channel should still be open")
require.NotNilf(t, resp, "expected keepAlive response after leader restore")
case <-time.After(5 * time.Second):
t.Fatal("keepAlive response timed out after leader restored")
}

ttl, err := c.TimeToLive(t.Context(), leaseRsp.ID)
require.NoError(t, err)
require.Positivef(t, ttl.TTL,
"lease should still be alive (TTL>0), got TTL=%d", ttl.TTL)
})
}

// TestV3LeaseFailureOverlap issues Grant and KeepAlive requests to a cluster
// before, during, and after quorum loss to confirm Grant/KeepAlive tolerates
// transient cluster failure.
Expand Down Expand Up @@ -718,58 +835,3 @@ func TestV3LeaseFailureOverlap(t *testing.T) {
mkReqs(4)
wg.Wait()
}

// TestLeaseWithRequireLeader checks keep-alive channel close when no leader.
func TestLeaseWithRequireLeader(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)

c := clus.Client(0)
lid1, err1 := c.Grant(t.Context(), 60)
require.NoError(t, err1)
lid2, err2 := c.Grant(t.Context(), 60)
require.NoError(t, err2)
// kaReqLeader close if the leader is lost
kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(t.Context()), lid1.ID)
require.NoError(t, kerr1)
// kaWait will wait even if the leader is lost
kaWait, kerr2 := c.KeepAlive(t.Context(), lid2.ID)
require.NoError(t, kerr2)

select {
case <-kaReqLeader:
case <-time.After(5 * time.Second):
t.Fatalf("require leader first keep-alive timed out")
}
select {
case <-kaWait:
case <-time.After(5 * time.Second):
t.Fatalf("leader not required first keep-alive timed out")
}

clus.Members[1].Stop(t)
// kaReqLeader may issue multiple requests while waiting for the first
// response from proxy server; drain any stray keepalive responses
time.Sleep(100 * time.Millisecond)
for {
<-kaReqLeader
if len(kaReqLeader) == 0 {
break
}
}

select {
case resp, ok := <-kaReqLeader:
require.Falsef(t, ok, "expected closed require leader, got response %+v", resp)
case <-time.After(5 * time.Second):
t.Fatal("keepalive with require leader took too long to close")
}
select {
case _, ok := <-kaWait:
require.Truef(t, ok, "got closed channel with no require leader, expected non-closed")
case <-time.After(10 * time.Millisecond):
// wait some to detect any closes happening soon after kaReqLeader closing
}
}