Skip to content

Commit 3936262

Browse files
committed
Merge branch 'master' of https://github.com/chrislusf/raft
2 parents 0b95ac8 + 2be29d9 commit 3936262

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

grpc_transporter.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ var (
1919
// An GrpcTransporter is a default transport layer used to communicate between
2020
// multiple servers.
2121
type GrpcTransporter struct {
22-
grpcDialOption grpc.DialOption
22+
grpcDialOptions []grpc.DialOption
2323
}
2424

2525
// Creates a new HTTP transporter with the given path prefix.
26-
func NewGrpcTransporter(grpcDialOption grpc.DialOption) *GrpcTransporter {
26+
func NewGrpcTransporter(grpcDialOptions ...grpc.DialOption) *GrpcTransporter {
2727
t := &GrpcTransporter{
28-
grpcDialOption: grpcDialOption,
28+
grpcDialOptions: grpcDialOptions,
2929
}
3030
return t
3131
}
@@ -49,7 +49,7 @@ func NewGrpcServer(server Server) *GrpcServer {
4949
// Sends an AppendEntries RPC to a peer.
5050
func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) (ret *AppendEntriesResponse) {
5151

52-
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
52+
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
5353
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
5454
defer cancel()
5555

@@ -83,7 +83,7 @@ func (t *GrpcTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re
8383
// Sends a RequestVote RPC to a peer.
8484
func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) (ret *RequestVoteResponse) {
8585

86-
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
86+
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
8787
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
8888
defer cancel()
8989

@@ -117,7 +117,7 @@ func (t *GrpcTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques
117117
// Sends a SnapshotRequest RPC to a peer.
118118
func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) (ret *SnapshotResponse) {
119119

120-
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
120+
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
121121
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
122122
defer cancel()
123123

@@ -149,7 +149,7 @@ func (t *GrpcTransporter) SendSnapshotRequest(server Server, peer *Peer, req *Sn
149149
// Sends a SnapshotRequest RPC to a peer.
150150
func (t *GrpcTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) (ret *SnapshotRecoveryResponse) {
151151

152-
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOption, func(client protobuf.RaftClient) error {
152+
err := withRaftServerClient(peer.ConnectionString, t.grpcDialOptions, func(client protobuf.RaftClient) error {
153153
ctx, cancel := context2.WithTimeout(context2.Background(), time.Duration(5*time.Second))
154154
defer cancel()
155155

@@ -285,12 +285,12 @@ func (t *GrpcServer) OnSendSnapshotRecoveryRequest(ctx context2.Context, pbReq *
285285
}, nil
286286
}
287287

288-
func withRaftServerClient(raftServer string, grpcDialOption grpc.DialOption, fn func(protobuf.RaftClient) error) error {
288+
func withRaftServerClient(raftServer string, grpcDialOptions []grpc.DialOption, fn func(protobuf.RaftClient) error) error {
289289

290290
return withCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
291291
client := protobuf.NewRaftClient(grpcConnection)
292292
return fn(client)
293-
}, raftServer, grpcDialOption)
293+
}, raftServer, grpcDialOptions...)
294294

295295
}
296296

server.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ func (s *server) sendAsync(value interface{}) {
661661
}
662662

663663
func (s *server) checkQuorumActive(timeout time.Duration) bool {
664-
s.debugln("check.quorum.active")
664+
s.debugln("check.quorum.active with quorum size: ", s.QuorumSize(), " member count: ", s.MemberCount())
665665
act := 1
666666
now := time.Now()
667667
for _, peer := range s.peers {
@@ -1267,7 +1267,9 @@ func (s *server) TakeSnapshot() error {
12671267
// Attach snapshot to pending snapshot and save it to disk.
12681268
s.pendingSnapshot.Peers = peers
12691269
s.pendingSnapshot.State = state
1270-
s.saveSnapshot()
1270+
if err := s.saveSnapshot(); err != nil {
1271+
return err
1272+
}
12711273

12721274
// We keep some log entries after the snapshot.
12731275
// We do not want to send the whole snapshot to the slightly slow machines

0 commit comments

Comments
 (0)