Skip to content
This repository was archived by the owner on Jun 17, 2024. It is now read-only.

Commit 3a93662

Browse files
Randomize selection of orderer nodes with retry (hyperledger#2951)
In the Submit() API method, the list of available orderers has been randomized to support improved load balancing. Retry logic has been added such that if the selected orderer fails to return a success code, then the next orderer in the list is tried. If no orderers succeed, then return an error (containing details from each orderer) to the client. Signed-off-by: andrew-coleman <[email protected]>
1 parent 210d20f commit 3a93662

File tree

2 files changed

+132
-20
lines changed

2 files changed

+132
-20
lines changed

internal/pkg/gateway/api.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package gateway
99
import (
1010
"context"
1111
"fmt"
12+
"math/rand"
1213
"sync"
1314

1415
"github.com/golang/protobuf/proto"
@@ -281,31 +282,44 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
281282
return nil, status.Errorf(codes.Unavailable, "no orderer nodes available")
282283
}
283284

284-
orderer := orderers[0] // send to first orderer for now
285+
// try each orderer in random order
286+
var errDetails []proto.Message
287+
for _, index := range rand.Perm(len(orderers)) {
288+
orderer := orderers[index]
289+
err := gs.broadcast(ctx, orderer, txn)
290+
if err == nil {
291+
return &gp.SubmitResponse{}, nil
292+
}
293+
logger.Warnw("Error sending transaction to orderer", "TxID", request.TransactionId, "endpoint", orderer.address, "err", err)
294+
errDetails = append(errDetails, endpointError(orderer.endpointConfig, err))
295+
}
285296

297+
return nil, rpcError(codes.Aborted, "no orderers could successfully process transaction", errDetails...)
298+
}
299+
300+
func (gs *Server) broadcast(ctx context.Context, orderer *orderer, txn *common.Envelope) error {
286301
broadcast, err := orderer.client.Broadcast(ctx)
287302
if err != nil {
288-
return nil, wrappedRpcError(err, "failed to create BroadcastClient", endpointError(orderer.endpointConfig, err))
303+
return fmt.Errorf("failed to create BroadcastClient: %w", err)
289304
}
290305
logger.Info("Submitting txn to orderer")
291306
if err := broadcast.Send(txn); err != nil {
292-
return nil, wrappedRpcError(err, "failed to send transaction to orderer", endpointError(orderer.endpointConfig, err))
307+
return fmt.Errorf("failed to send transaction to orderer: %w", err)
293308
}
294309

295310
response, err := broadcast.Recv()
296311
if err != nil {
297-
return nil, wrappedRpcError(err, "failed to receive response from orderer", endpointError(orderer.endpointConfig, err))
312+
return fmt.Errorf("failed to receive response from orderer: %w", err)
298313
}
299314

300315
if response == nil {
301-
return nil, status.Error(codes.Aborted, "received nil response from orderer")
316+
return fmt.Errorf("received nil response from orderer")
302317
}
303318

304319
if response.Status != common.Status_SUCCESS {
305-
return nil, status.Errorf(codes.Aborted, "received unsuccessful response from orderer: %s", common.Status_name[int32(response.Status)])
320+
return fmt.Errorf("received unsuccessful response from orderer: %s", common.Status_name[int32(response.Status)])
306321
}
307-
308-
return &gp.SubmitResponse{}, nil
322+
return nil
309323
}
310324

311325
// CommitStatus returns the validation code for a specific transaction on a specific channel. If the transaction is

internal/pkg/gateway/api_test.go

Lines changed: 110 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -736,11 +736,11 @@ func TestSubmit(t *testing.T) {
736736
proposalResponseStatus: 200,
737737
ordererBroadcastError: status.Error(codes.FailedPrecondition, "Orderer not listening!"),
738738
},
739-
errString: "rpc error: code = FailedPrecondition desc = failed to create BroadcastClient: Orderer not listening!",
739+
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
740740
errDetails: []*pb.EndpointError{{
741741
Address: "orderer:7050",
742742
MspId: "msp1",
743-
Message: "rpc error: code = FailedPrecondition desc = Orderer not listening!",
743+
Message: "failed to create BroadcastClient: rpc error: code = FailedPrecondition desc = Orderer not listening!",
744744
}},
745745
},
746746
{
@@ -752,11 +752,11 @@ func TestSubmit(t *testing.T) {
752752
proposalResponseStatus: 200,
753753
ordererSendError: status.Error(codes.Internal, "Orderer says no!"),
754754
},
755-
errString: "rpc error: code = Internal desc = failed to send transaction to orderer: Orderer says no!",
755+
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
756756
errDetails: []*pb.EndpointError{{
757757
Address: "orderer:7050",
758758
MspId: "msp1",
759-
Message: "rpc error: code = Internal desc = Orderer says no!",
759+
Message: "failed to send transaction to orderer: rpc error: code = Internal desc = Orderer says no!",
760760
}},
761761
},
762762
{
@@ -768,11 +768,11 @@ func TestSubmit(t *testing.T) {
768768
proposalResponseStatus: 200,
769769
ordererRecvError: status.Error(codes.FailedPrecondition, "Orderer not happy!"),
770770
},
771-
errString: "rpc error: code = FailedPrecondition desc = failed to receive response from orderer: Orderer not happy!",
771+
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
772772
errDetails: []*pb.EndpointError{{
773773
Address: "orderer:7050",
774774
MspId: "msp1",
775-
Message: "rpc error: code = FailedPrecondition desc = Orderer not happy!",
775+
Message: "failed to receive response from orderer: rpc error: code = FailedPrecondition desc = Orderer not happy!",
776776
}},
777777
},
778778
{
@@ -789,7 +789,12 @@ func TestSubmit(t *testing.T) {
789789
return abc
790790
}
791791
},
792-
errString: "rpc error: code = Aborted desc = received nil response from orderer",
792+
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
793+
errDetails: []*pb.EndpointError{{
794+
Address: "orderer:7050",
795+
MspId: "msp1",
796+
Message: "received nil response from orderer",
797+
}},
793798
},
794799
{
795800
name: "orderer returns unsuccessful response",
@@ -808,7 +813,12 @@ func TestSubmit(t *testing.T) {
808813
return abc
809814
}
810815
},
811-
errString: "rpc error: code = Aborted desc = received unsuccessful response from orderer: " + cp.Status_name[int32(cp.Status_BAD_REQUEST)],
816+
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
817+
errDetails: []*pb.EndpointError{{
818+
Address: "orderer:7050",
819+
MspId: "msp1",
820+
Message: "received unsuccessful response from orderer: " + cp.Status_name[int32(cp.Status_BAD_REQUEST)],
821+
}},
812822
},
813823
{
814824
name: "dialing orderer endpoint fails",
@@ -825,6 +835,96 @@ func TestSubmit(t *testing.T) {
825835
},
826836
errString: "rpc error: code = Unavailable desc = no orderer nodes available",
827837
},
838+
{
839+
name: "orderer retry",
840+
plan: endorsementPlan{
841+
"g1": {{endorser: localhostMock}},
842+
},
843+
config: &dp.ConfigResult{
844+
Orderers: map[string]*dp.Endpoints{
845+
"msp1": {
846+
Endpoint: []*dp.Endpoint{
847+
{Host: "orderer1", Port: 7050},
848+
{Host: "orderer2", Port: 7050},
849+
{Host: "orderer3", Port: 7050},
850+
},
851+
},
852+
},
853+
Msps: map[string]*msp.FabricMSPConfig{
854+
"msp1": {
855+
TlsRootCerts: [][]byte{},
856+
},
857+
},
858+
},
859+
postSetup: func(t *testing.T, def *preparedTest) {
860+
abc := &mocks.ABClient{}
861+
abbc := &mocks.ABBClient{}
862+
abbc.SendReturnsOnCall(0, status.Error(codes.FailedPrecondition, "First orderer error"))
863+
abbc.SendReturnsOnCall(1, status.Error(codes.FailedPrecondition, "Second orderer error"))
864+
abbc.SendReturnsOnCall(2, nil) // third time lucky
865+
abbc.RecvReturns(&ab.BroadcastResponse{
866+
Info: "success",
867+
Status: cp.Status(200),
868+
}, nil)
869+
abc.BroadcastReturns(abbc, nil)
870+
def.server.registry.endpointFactory = &endpointFactory{
871+
timeout: 5 * time.Second,
872+
connectEndorser: func(conn *grpc.ClientConn) peer.EndorserClient {
873+
return &mocks.EndorserClient{}
874+
},
875+
connectOrderer: func(_ *grpc.ClientConn) ab.AtomicBroadcastClient {
876+
return abc
877+
},
878+
dialer: func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
879+
return nil, nil
880+
},
881+
}
882+
},
883+
},
884+
{
885+
name: "multiple orderers all fail",
886+
plan: endorsementPlan{
887+
"g1": {{endorser: localhostMock}},
888+
},
889+
config: &dp.ConfigResult{
890+
Orderers: map[string]*dp.Endpoints{
891+
"msp1": {
892+
Endpoint: []*dp.Endpoint{
893+
{Host: "orderer1", Port: 7050},
894+
{Host: "orderer2", Port: 7050},
895+
{Host: "orderer3", Port: 7050},
896+
},
897+
},
898+
},
899+
Msps: map[string]*msp.FabricMSPConfig{
900+
"msp1": {
901+
TlsRootCerts: [][]byte{},
902+
},
903+
},
904+
},
905+
endpointDefinition: &endpointDef{
906+
proposalResponseStatus: 200,
907+
ordererBroadcastError: status.Error(codes.FailedPrecondition, "Orderer not listening!"),
908+
},
909+
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
910+
errDetails: []*pb.EndpointError{
911+
{
912+
Address: "orderer1:7050",
913+
MspId: "msp1",
914+
Message: "failed to create BroadcastClient: rpc error: code = FailedPrecondition desc = Orderer not listening!",
915+
},
916+
{
917+
Address: "orderer2:7050",
918+
MspId: "msp1",
919+
Message: "failed to create BroadcastClient: rpc error: code = FailedPrecondition desc = Orderer not listening!",
920+
},
921+
{
922+
Address: "orderer3:7050",
923+
MspId: "msp1",
924+
Message: "failed to create BroadcastClient: rpc error: code = FailedPrecondition desc = Orderer not listening!",
925+
},
926+
},
927+
},
828928
}
829929
for _, tt := range tests {
830930
t.Run(tt.name, func(t *testing.T) {
@@ -1506,10 +1606,8 @@ func checkError(t *testing.T, err error, errString string, details []*pb.Endpoin
15061606
s, ok := status.FromError(err)
15071607
require.True(t, ok, "Expected a gRPC status error")
15081608
require.Len(t, s.Details(), len(details))
1509-
for i, detail := range details {
1510-
require.Equal(t, detail.Message, s.Details()[i].(*pb.EndpointError).Message)
1511-
require.Equal(t, detail.MspId, s.Details()[i].(*pb.EndpointError).MspId)
1512-
require.Equal(t, detail.Address, s.Details()[i].(*pb.EndpointError).Address)
1609+
for _, detail := range s.Details() {
1610+
require.Contains(t, details, detail)
15131611
}
15141612
}
15151613

0 commit comments

Comments
 (0)