Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,12 @@ func _newPipe(ctx context.Context, connFn func(context.Context) (net.Conn, error
}
}
if !nobg {
if p.onInvalidations != nil || option.AlwaysPipelining {
p.background()
}
if p.timeout > 0 && p.pinggap > 0 {
p.backgroundPing()
}
if p.onInvalidations != nil || option.AlwaysPipelining {
p.background()
}
}
if option.ConnLifetime > 0 {
p.lftm = option.ConnLifetime
Expand Down Expand Up @@ -652,9 +652,16 @@ func (p *pipe) _backgroundRead() (err error) {

func (p *pipe) backgroundPing() {
var prev, recv int32
var mu sync.Mutex

mu.Lock()
defer mu.Unlock()

prev = p.loadRecvs()
p.pingTimer = time.AfterFunc(p.pinggap, func() {
mu.Lock()
defer mu.Unlock()

var err error
recv = p.loadRecvs()
defer func() {
Expand Down
102 changes: 102 additions & 0 deletions pipe_backgroundping_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package rueidis

import (
"sync"
"testing"
"time"
)

// TestPipe_BackgroundPing_NoDataRace tests that backgroundPing doesn't have data races
// when timer callbacks fire repeatedly. This is the simplest test to catch the race.
// This test will fail with -race if the data race exists in backgroundPing.
func TestPipe_BackgroundPing_NoDataRace(t *testing.T) {
t.Parallel()

if testing.Short() {
t.Skip("Skipping race detection test in short mode")
}

// Use the existing setup function with a short keep-alive interval
option := ClientOption{
ConnWriteTimeout: 100 * time.Millisecond,
}
option.Dialer.KeepAlive = 20 * time.Millisecond // Short interval for rapid timer firings

p, mock, cancel, closeConn := setup(t, option)
defer cancel()
_ = closeConn
_ = mock
_ = p

// Simply let the pipe exist with background ping active
// The background ping timer will fire multiple times
// If there's a race in the prev variable or timer access, -race will catch it
time.Sleep(300 * time.Millisecond) // Let timer fire ~15 times

// The race detector will report any concurrent access to 'prev' variable
}

// TestPipe_BackgroundPing_ConcurrentClients tests backgroundPing with multiple
// concurrent clients, each with their own background ping timers.
func TestPipe_BackgroundPing_ConcurrentClients(t *testing.T) {
t.Parallel()

if testing.Short() {
t.Skip("Skipping concurrent race test in short mode")
}

numClients := 10
var wg sync.WaitGroup

for i := 0; i < numClients; i++ {
wg.Add(1)
go func(clientID int) {
defer wg.Done()

// Use the existing setup function with a short keep-alive interval
option := ClientOption{
ConnWriteTimeout: 100 * time.Millisecond,
}
option.Dialer.KeepAlive = 30 * time.Millisecond

_, _, cancel, _ := setup(t, option)
defer cancel()

// Let the pipe exist with background ping active
time.Sleep(200 * time.Millisecond) // Let timer fire multiple times
}(i)
}

wg.Wait()
}

// TestPipe_BackgroundPing_RapidConnectDisconnect reproduces the scenario
// where pipes are created and destroyed rapidly (like crash recovery).
func TestPipe_BackgroundPing_RapidConnectDisconnect(t *testing.T) {
t.Parallel()

if testing.Short() {
t.Skip("Skipping rapid connect/disconnect test in short mode")
}

for iteration := 0; iteration < 20; iteration++ {
// Use the existing setup function with a short keep-alive interval
option := ClientOption{
ConnWriteTimeout: 100 * time.Millisecond,
}
option.Dialer.KeepAlive = 40 * time.Millisecond

p, _, cancel, _ := setup(t, option)

// Brief operation period
time.Sleep(50 * time.Millisecond)

// Immediate close (like consumer crash)
cancel()

// Brief pause to let background goroutines settle
time.Sleep(5 * time.Millisecond)

_ = p
}
}
Loading