Skip to content

Commit c3a3cf2

Browse files
committed
refactor: move raft transport initialization closer to cluster setup
1 parent eae9a91 commit c3a3cf2

File tree

2 files changed

+44
-39
lines changed

2 files changed

+44
-39
lines changed

server/etcdserver/bootstrap.go

+41-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"encoding/json"
1919
"errors"
2020
"fmt"
21+
stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
2122
"io"
2223
"net/http"
2324
"os"
@@ -104,24 +105,54 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
104105
backend.Close()
105106
return nil, err
106107
}
108+
sstats := stats.NewServerStats(cfg.Name, cluster.cl.String())
109+
lstats := stats.NewLeaderStats(cfg.Logger, cluster.nodeID.String())
110+
111+
raftTransport := &rafthttp.Transport{
112+
Logger: cfg.Logger,
113+
TLSInfo: cfg.PeerTLSInfo,
114+
DialTimeout: cfg.PeerDialTimeout(),
115+
ID: cluster.nodeID,
116+
URLs: cfg.PeerURLs,
117+
ClusterID: cluster.cl.ID(),
118+
ServerStats: sstats,
119+
LeaderStats: lstats,
120+
ErrorC: make(chan error, 1), // placeholder, can override later
121+
}
122+
if err = raftTransport.Start(); err != nil {
123+
return nil, err
124+
}
125+
for _, m := range cluster.remotes {
126+
if m.ID != cluster.nodeID {
127+
raftTransport.AddRemote(m.ID, m.PeerURLs)
128+
}
129+
}
130+
for _, m := range cluster.cl.Members() {
131+
if m.ID != cluster.nodeID {
132+
raftTransport.AddPeer(m.ID, m.PeerURLs)
133+
}
134+
}
107135

108136
cfg.Logger.Info("bootstrapping raft")
109137
raft := bootstrapRaft(cfg, cluster, s.wal)
138+
110139
return &bootstrappedServer{
111-
prt: prt,
112-
ss: ss,
113-
storage: s,
114-
cluster: cluster,
115-
raft: raft,
140+
prt: prt,
141+
ss: ss,
142+
storage: s,
143+
cluster: cluster,
144+
raft: raft,
145+
raftTransport: raftTransport,
116146
}, nil
117147
}
118148

119149
type bootstrappedServer struct {
120-
storage *bootstrappedStorage
121-
cluster *bootstrappedCluster
122-
raft *bootstrappedRaft
123-
prt http.RoundTripper
124-
ss *snap.Snapshotter
150+
storage *bootstrappedStorage
151+
cluster *bootstrappedCluster
152+
raft *bootstrappedRaft
153+
prt http.RoundTripper
154+
ss *snap.Snapshotter
155+
raftTransport *rafthttp.Transport
125156
}
126157

127158
func (s *bootstrappedServer) Close() {

server/etcdserver/server.go

+3-29
Original file line numberDiff line numberDiff line change
@@ -418,36 +418,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
418418
// the hook being called during the initialization process.
419419
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
420420

421-
// TODO: move transport initialization near the definition of remote
422-
tr := &rafthttp.Transport{
423-
Logger: cfg.Logger,
424-
TLSInfo: cfg.PeerTLSInfo,
425-
DialTimeout: cfg.PeerDialTimeout(),
426-
ID: b.cluster.nodeID,
427-
URLs: cfg.PeerURLs,
428-
ClusterID: b.cluster.cl.ID(),
429-
Raft: srv,
430-
Snapshotter: b.ss,
431-
ServerStats: sstats,
432-
LeaderStats: lstats,
433-
ErrorC: srv.errorc,
434-
}
435-
if err = tr.Start(); err != nil {
436-
return nil, err
437-
}
438-
// add all remotes into transport
439-
for _, m := range b.cluster.remotes {
440-
if m.ID != b.cluster.nodeID {
441-
tr.AddRemote(m.ID, m.PeerURLs)
442-
}
443-
}
444-
for _, m := range b.cluster.cl.Members() {
445-
if m.ID != b.cluster.nodeID {
446-
tr.AddPeer(m.ID, m.PeerURLs)
447-
}
448-
}
421+
tr := b.raftTransport
422+
tr.Raft = srv
449423
srv.r.transport = tr
450-
424+
tr.ErrorC = srv.errorc
451425
return srv, nil
452426
}
453427

0 commit comments

Comments
 (0)