Skip to content

Commit 1ce36eb

Browse files
committed
[deliver] Fix flaky test
Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent c02accc commit 1ce36eb

2 files changed

Lines changed: 61 additions & 27 deletions

File tree

utils/deliver/client_test.go

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ func TestBroadcastDeliver(t *testing.T) {
5151

5252
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
5353
t.Cleanup(cancel)
54-
stream, err := test.NewBroadcastStream(ctx, &conf)
55-
require.NoError(t, err)
56-
t.Cleanup(stream.Close)
54+
5755
outputBlocksChan := make(chan *common.Block, 100)
5856
go func() {
5957
p := &Parameters{
@@ -78,59 +76,62 @@ func TestBroadcastDeliver(t *testing.T) {
7876
require.Len(t, b.Data.Data, 1)
7977

8078
t.Log("All good")
81-
submit(t, stream, outputBlocks, expectedSubmit{
79+
submit(t, &conf, outputBlocks, expectedSubmit{
8280
success: 3,
8381
})
8482

8583
t.Log("One server down")
8684
servers[2].Servers[0].Stop()
87-
time.Sleep(3 * time.Second)
88-
submit(t, stream, outputBlocks, expectedSubmit{
85+
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[0].Endpoint)
86+
submit(t, &conf, outputBlocks, expectedSubmit{
8987
success: 3,
9088
})
9189

9290
t.Log("Two servers down")
9391
servers[2].Servers[1].Stop()
94-
time.Sleep(3 * time.Second)
95-
submit(t, stream, outputBlocks, expectedSubmit{
92+
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[1].Endpoint)
93+
submit(t, &conf, outputBlocks, expectedSubmit{
9694
success: 2,
9795
unavailable: 1,
9896
})
9997

10098
t.Log("One incorrect server")
10199
fakeServer := test.RunGrpcServerForTest(ctx, t, servers[2].Configs[0], nil)
102100
waitUntilGrpcServerIsReady(ctx, t, &servers[2].Configs[0].Endpoint)
103-
104-
submit(t, stream, outputBlocks, expectedSubmit{
101+
submit(t, &conf, outputBlocks, expectedSubmit{
105102
success: 2,
106103
unimplemented: 1,
107104
})
108105

109106
t.Log("All good again")
110107
fakeServer.Stop()
108+
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[0].Endpoint)
111109
servers[2].Servers[0] = test.RunGrpcServerForTest(ctx, t, servers[2].Configs[0], ordererService.RegisterService)
112110
waitUntilGrpcServerIsReady(ctx, t, &servers[2].Configs[0].Endpoint)
113-
submit(t, stream, outputBlocks, expectedSubmit{
111+
submit(t, &conf, outputBlocks, expectedSubmit{
114112
success: 3,
115113
})
116114

117115
t.Log("Insufficient quorum")
118-
servers[0].Servers[0].Stop()
119-
servers[0].Servers[1].Stop()
120-
servers[1].Servers[0].Stop()
121-
servers[1].Servers[1].Stop()
122-
time.Sleep(3 * time.Second)
123-
submit(t, stream, outputBlocks, expectedSubmit{
116+
for i := range 2 {
117+
for j := range 2 {
118+
servers[i].Servers[j].Stop()
119+
}
120+
}
121+
for i := range 2 {
122+
for j := range 2 {
123+
waitUntilGrpcServerIsDown(ctx, t, &servers[i].Configs[j].Endpoint)
124+
}
125+
}
126+
submit(t, &conf, outputBlocks, expectedSubmit{
124127
success: 1,
125128
unavailable: 2,
126129
})
127130

128131
t.Log("Update endpoints")
129132
conf.Connection.Endpoints = allEndpoints[6:]
130-
err = client.UpdateConnections(&conf.Connection)
131-
require.NoError(t, err)
132-
require.NoError(t, stream.ConnectionManager.Update(&conf.Connection))
133-
submit(t, stream, outputBlocks, expectedSubmit{
133+
require.NoError(t, client.UpdateConnections(&conf.Connection))
134+
submit(t, &conf, outputBlocks, expectedSubmit{
134135
success: 3,
135136
})
136137
}
@@ -143,22 +144,29 @@ type expectedSubmit struct {
143144

144145
func submit(
145146
t *testing.T,
146-
stream *test.BroadcastStream,
147+
conf *ordererconn.Config,
147148
outputBlocks channel.Reader[*common.Block],
148149
expected expectedSubmit,
149150
) {
150151
t.Helper()
151152
txb := workload.TxBuilder{ChannelID: channelForTest}
152153
tx := txb.MakeTx(&protoblocktx.Tx{})
153154

154-
err := stream.SendBatch(workload.MapToEnvelopeBatch(0, []*protoloadgen.TX{tx}))
155+
// We create a new stream for each request to ensure GRPC does not cache the latest state.
156+
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
157+
defer cancel()
158+
stream, err := test.NewBroadcastStream(ctx, conf)
159+
require.NoError(t, err)
160+
defer stream.Close()
161+
162+
err = stream.SendBatch(workload.MapToEnvelopeBatch(0, []*protoloadgen.TX{tx}))
155163
if err != nil {
156164
t.Logf("Response error:\n%s", err)
157165
}
158166
if expected.unavailable > 0 {
159-
// Unimplemented sometimes not responding with error, so we cannot enforce it.
160167
require.Error(t, err)
161168
} else if expected.unimplemented == 0 {
169+
// Unimplemented sometimes not responding with error, so we cannot enforce it.
162170
require.NoError(t, err)
163171
}
164172

@@ -226,6 +234,13 @@ func waitUntilGrpcServerIsReady(ctx context.Context, t *testing.T, endpoint *con
226234
defer connection.CloseConnectionsLog(newConn)
227235
test.WaitUntilGrpcServerIsReady(ctx, t, newConn)
228236
t.Logf("%v is ready", endpoint)
229-
// Wait a while to allow the GRPC connection enough time to make a new attempt.
230-
time.Sleep(time.Second)
237+
}
238+
239+
func waitUntilGrpcServerIsDown(ctx context.Context, t *testing.T, endpoint *connection.Endpoint) {
240+
t.Helper()
241+
newConn, err := connection.Connect(connection.NewInsecureDialConfig(endpoint))
242+
require.NoError(t, err)
243+
defer connection.CloseConnectionsLog(newConn)
244+
test.WaitUntilGrpcServerIsDown(ctx, t, newConn)
245+
t.Logf("%v is down", endpoint)
231246
}

utils/test/utils.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,31 @@ func WaitUntilGrpcServerIsReady(
206206
return
207207
}
208208
healthClient := healthgrpc.NewHealthClient(conn)
209-
res, err := healthClient.Check(ctx, &healthgrpc.HealthCheckRequest{}, grpc.WaitForReady(true))
209+
res, err := healthClient.Check(ctx, nil, grpc.WaitForReady(true))
210210
assert.NotEqual(t, codes.Canceled, status.Code(err))
211211
require.NoError(t, err)
212212
require.Equal(t, healthgrpc.HealthCheckResponse_SERVING, res.Status)
213213
}
214214

215+
// WaitUntilGrpcServerIsDown uses the health check API to check a service is down.
216+
func WaitUntilGrpcServerIsDown(
217+
ctx context.Context,
218+
t *testing.T,
219+
conn grpc.ClientConnInterface,
220+
) {
221+
t.Helper()
222+
if conn == nil {
223+
return
224+
}
225+
healthClient := healthgrpc.NewHealthClient(conn)
226+
require.EventuallyWithT(t, func(ct *assert.CollectT) {
227+
checkCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
228+
defer cancel()
229+
_, err := healthClient.Check(checkCtx, nil)
230+
require.Error(ct, err)
231+
}, time.Minute, 50*time.Millisecond)
232+
}
233+
215234
// StatusRetriever provides implementation retrieve status of given transaction identifiers.
216235
type StatusRetriever interface {
217236
GetTransactionsStatus(context.Context, *protoblocktx.QueryStatus, ...grpc.CallOption) (

0 commit comments

Comments
 (0)