Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 45 additions & 35 deletions utils/deliver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/hyperledger/fabric-protos-go-apiv2/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hyperledger/fabric-x-committer/api/protoblocktx"
Expand Down Expand Up @@ -51,23 +52,15 @@ func TestBroadcastDeliver(t *testing.T) {

ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
t.Cleanup(cancel)
stream, err := test.NewBroadcastStream(ctx, &conf)
require.NoError(t, err)
t.Cleanup(stream.Close)

outputBlocksChan := make(chan *common.Block, 100)
go func() {
p := &Parameters{
err = client.Deliver(ctx, &Parameters{
StartBlkNum: 0,
EndBlkNum: MaxBlockNum,
OutputBlock: outputBlocksChan,
}
// We set the client to stop retry after a second to quickly
// receive the broadcast errors.
// But for delivery, we want to continue indefinitely.
for ctx.Err() == nil {
err = client.Deliver(ctx, p)
t.Logf("Deliver ended with: %v", err)
}
})
assert.ErrorIs(t, err, context.Canceled)
}()
outputBlocks := channel.NewReader(ctx, outputBlocksChan)

Expand All @@ -78,59 +71,62 @@ func TestBroadcastDeliver(t *testing.T) {
require.Len(t, b.Data.Data, 1)

t.Log("All good")
submit(t, stream, outputBlocks, expectedSubmit{
submit(t, &conf, outputBlocks, expectedSubmit{
success: 3,
})

t.Log("One server down")
servers[2].Servers[0].Stop()
time.Sleep(3 * time.Second)
submit(t, stream, outputBlocks, expectedSubmit{
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[0].Endpoint)
submit(t, &conf, outputBlocks, expectedSubmit{
success: 3,
})

t.Log("Two servers down")
servers[2].Servers[1].Stop()
time.Sleep(3 * time.Second)
submit(t, stream, outputBlocks, expectedSubmit{
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[1].Endpoint)
submit(t, &conf, outputBlocks, expectedSubmit{
success: 2,
unavailable: 1,
})

t.Log("One incorrect server")
fakeServer := test.RunGrpcServerForTest(ctx, t, servers[2].Configs[0], nil)
waitUntilGrpcServerIsReady(ctx, t, &servers[2].Configs[0].Endpoint)

submit(t, stream, outputBlocks, expectedSubmit{
submit(t, &conf, outputBlocks, expectedSubmit{
success: 2,
unimplemented: 1,
})

t.Log("All good again")
fakeServer.Stop()
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[0].Endpoint)
servers[2].Servers[0] = test.RunGrpcServerForTest(ctx, t, servers[2].Configs[0], ordererService.RegisterService)
waitUntilGrpcServerIsReady(ctx, t, &servers[2].Configs[0].Endpoint)
submit(t, stream, outputBlocks, expectedSubmit{
submit(t, &conf, outputBlocks, expectedSubmit{
success: 3,
})

t.Log("Insufficient quorum")
servers[0].Servers[0].Stop()
servers[0].Servers[1].Stop()
servers[1].Servers[0].Stop()
servers[1].Servers[1].Stop()
time.Sleep(3 * time.Second)
submit(t, stream, outputBlocks, expectedSubmit{
for _, gs := range servers[:2] {
for _, s := range gs.Servers[:2] {
s.Stop()
}
}
for _, gs := range servers[:2] {
for _, c := range gs.Configs[:2] {
waitUntilGrpcServerIsDown(ctx, t, &c.Endpoint)
}
}
submit(t, &conf, outputBlocks, expectedSubmit{
success: 1,
unavailable: 2,
})

t.Log("Update endpoints")
conf.Connection.Endpoints = allEndpoints[6:]
err = client.UpdateConnections(&conf.Connection)
require.NoError(t, err)
require.NoError(t, stream.ConnectionManager.Update(&conf.Connection))
submit(t, stream, outputBlocks, expectedSubmit{
require.NoError(t, client.UpdateConnections(&conf.Connection))
submit(t, &conf, outputBlocks, expectedSubmit{
success: 3,
})
}
Expand All @@ -143,22 +139,29 @@ type expectedSubmit struct {

func submit(
t *testing.T,
stream *test.BroadcastStream,
conf *ordererconn.Config,
outputBlocks channel.Reader[*common.Block],
expected expectedSubmit,
) {
t.Helper()
txb := workload.TxBuilder{ChannelID: channelForTest}
tx := txb.MakeTx(&protoblocktx.Tx{})

err := stream.SendBatch(workload.MapToEnvelopeBatch(0, []*protoloadgen.TX{tx}))
// We create a new stream for each request to ensure GRPC does not cache the latest state.
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
defer cancel()
stream, err := test.NewBroadcastStream(ctx, conf)
require.NoError(t, err)
defer stream.Close()

err = stream.SendBatch(workload.MapToEnvelopeBatch(0, []*protoloadgen.TX{tx}))
if err != nil {
t.Logf("Response error:\n%s", err)
}
if expected.unavailable > 0 {
// Unimplemented sometimes not responding with error, so we cannot enforce it.
require.Error(t, err)
} else if expected.unimplemented == 0 {
// Unimplemented sometimes not responding with error, so we cannot enforce it.
require.NoError(t, err)
}

Expand Down Expand Up @@ -226,6 +229,13 @@ func waitUntilGrpcServerIsReady(ctx context.Context, t *testing.T, endpoint *con
defer connection.CloseConnectionsLog(newConn)
test.WaitUntilGrpcServerIsReady(ctx, t, newConn)
t.Logf("%v is ready", endpoint)
// Wait a while to allow the GRPC connection enough time to make a new attempt.
time.Sleep(time.Second)
}

func waitUntilGrpcServerIsDown(ctx context.Context, t *testing.T, endpoint *connection.Endpoint) {
t.Helper()
newConn, err := connection.Connect(connection.NewInsecureDialConfig(endpoint))
require.NoError(t, err)
defer connection.CloseConnectionsLog(newConn)
test.WaitUntilGrpcServerIsDown(ctx, t, newConn)
t.Logf("%v is down", endpoint)
}
21 changes: 20 additions & 1 deletion utils/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,31 @@ func WaitUntilGrpcServerIsReady(
return
}
healthClient := healthgrpc.NewHealthClient(conn)
res, err := healthClient.Check(ctx, &healthgrpc.HealthCheckRequest{}, grpc.WaitForReady(true))
res, err := healthClient.Check(ctx, nil, grpc.WaitForReady(true))
assert.NotEqual(t, codes.Canceled, status.Code(err))
require.NoError(t, err)
require.Equal(t, healthgrpc.HealthCheckResponse_SERVING, res.Status)
}

// WaitUntilGrpcServerIsDown uses the health check API to check a service is down.
func WaitUntilGrpcServerIsDown(
ctx context.Context,
t *testing.T,
conn grpc.ClientConnInterface,
) {
t.Helper()
if conn == nil {
return
}
healthClient := healthgrpc.NewHealthClient(conn)
require.EventuallyWithT(t, func(ct *assert.CollectT) {
checkCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := healthClient.Check(checkCtx, nil)
require.Error(ct, err)
}, time.Minute, 50*time.Millisecond)
}

// StatusRetriever provides implementation retrieve status of given transaction identifiers.
type StatusRetriever interface {
GetTransactionsStatus(context.Context, *protoblocktx.QueryStatus, ...grpc.CallOption) (
Expand Down