Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 36 additions & 33 deletions tests/gocase/unit/wait/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,50 +235,53 @@ func TestWaitCommand(t *testing.T) {

// When WAIT is executed, it should block future commands in the buffer until the number of replicas that have reached the target sequence is reached.
func TestWaitBlockExecutingCommand(t *testing.T) {
// Start master server
masterSrv := util.StartServer(t, map[string]string{})
defer masterSrv.Close()
// Start master server
masterSrv := util.StartServer(t, map[string]string{})
defer masterSrv.Close()

tcpClient := masterSrv.NewTCPClient()
defer func() { require.NoError(t, tcpClient.Close()) }()
tcpClient := masterSrv.NewTCPClient()
defer func() { require.NoError(t, tcpClient.Close()) }()

// should be blocked after k1 is set
require.NoError(t, tcpClient.WriteArgs("SET", "k1", "v1"))
require.NoError(t, tcpClient.WriteArgs("WAIT", "1", "0"))
// Create a normal client to check server state
masterRdb := masterSrv.NewClient()
defer func() { require.NoError(t, masterRdb.Close()) }()

// sleep for some time, so the commands are not read by a single read callback
// if the server does not block the command while waiting
time.Sleep(1 * time.Second)
// should be blocked after k1 is set
require.NoError(t, tcpClient.WriteArgs("SET", "k1", "v1"))
require.NoError(t, tcpClient.WriteArgs("WAIT", "1", "0"))

// should be blocked after k1 is set to v1
require.NoError(t, tcpClient.WriteArgs("SET", "k1", "v2"))
require.NoError(t, tcpClient.WriteArgs("WAIT", "1", "0"))
// FIX: Instead of sleeping, we wait until the server confirms the client is blocked.
// This makes the test deterministic and faster.
require.Eventually(t, func() bool {
info := masterRdb.Info(context.Background(), "clients").Val()
return strings.Contains(info, "blocked_clients:1")
}, 5*time.Second, 100*time.Millisecond)

time.Sleep(1 * time.Second)
// should be blocked after k1 is set to v1
require.NoError(t, tcpClient.WriteArgs("SET", "k1", "v2"))
require.NoError(t, tcpClient.WriteArgs("WAIT", "1", "0"))

require.NoError(t, tcpClient.WriteArgs("SET", "k1", "v3"))
// No need to sleep here; we know the previous WAIT is holding the connection.
require.NoError(t, tcpClient.WriteArgs("SET", "k1", "v3"))

masterRdb := masterSrv.NewClient()
defer func() { require.NoError(t, masterRdb.Close()) }()

// only the first command should be executed
require.Equal(t, "v1", masterRdb.Get(context.Background(), "k1").Val())
// only the first command should be executed
require.Equal(t, "v1", masterRdb.Get(context.Background(), "k1").Val())

// Start slave server
slaveSrv := util.StartServer(t, map[string]string{})
defer slaveSrv.Close()
// Start slave server
slaveSrv := util.StartServer(t, map[string]string{})
defer slaveSrv.Close()

slaveRdb := slaveSrv.NewClient()
defer func() { require.NoError(t, slaveRdb.Close()) }()
slaveRdb := slaveSrv.NewClient()
defer func() { require.NoError(t, slaveRdb.Close()) }()

// Set up replication
util.SlaveOf(t, slaveRdb, masterSrv)
util.WaitForOffsetSync(t, masterRdb, slaveRdb, 5*time.Second)
// Set up replication
util.SlaveOf(t, slaveRdb, masterSrv)
util.WaitForOffsetSync(t, masterRdb, slaveRdb, 5*time.Second)

// the remaining command should be executed after replication
require.Eventually(t, func() bool {
return slaveRdb.Get(context.Background(), "k1").Val() == "v3"
}, 5*time.Second, 100*time.Millisecond)
// the remaining command should be executed after replication
require.Eventually(t, func() bool {
return slaveRdb.Get(context.Background(), "k1").Val() == "v3"
}, 5*time.Second, 100*time.Millisecond)
}

// if a command is blocked by WAIT, it should continue to execute after the WAIT command is completed.
Expand Down
Loading