diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index b25b7f6e1db..826f60888be 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -18,6 +18,7 @@ import ( "encoding/json" "errors" "fmt" + stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats" "io" "net/http" "os" @@ -104,24 +105,54 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { backend.Close() return nil, err } + sstats := stats.NewServerStats(cfg.Name, cluster.cl.String()) + lstats := stats.NewLeaderStats(cfg.Logger, cluster.nodeID.String()) + + raftTransport := &rafthttp.Transport{ + Logger: cfg.Logger, + TLSInfo: cfg.PeerTLSInfo, + DialTimeout: cfg.PeerDialTimeout(), + ID: cluster.nodeID, + URLs: cfg.PeerURLs, + ClusterID: cluster.cl.ID(), + ServerStats: sstats, + LeaderStats: lstats, + ErrorC: make(chan error, 1), // placeholder, can override later + } + if err = raftTransport.Start(); err != nil { + return nil, err + } + for _, m := range cluster.remotes { + if m.ID != cluster.nodeID { + raftTransport.AddRemote(m.ID, m.PeerURLs) + } + } + for _, m := range cluster.cl.Members() { + if m.ID != cluster.nodeID { + raftTransport.AddPeer(m.ID, m.PeerURLs) + } + } cfg.Logger.Info("bootstrapping raft") raft := bootstrapRaft(cfg, cluster, s.wal) + return &bootstrappedServer{ - prt: prt, - ss: ss, - storage: s, - cluster: cluster, - raft: raft, + prt: prt, + ss: ss, + storage: s, + cluster: cluster, + raft: raft, + raftTransport: raftTransport, }, nil } type bootstrappedServer struct { - storage *bootstrappedStorage - cluster *bootstrappedCluster - raft *bootstrappedRaft - prt http.RoundTripper - ss *snap.Snapshotter + storage *bootstrappedStorage + cluster *bootstrappedCluster + raft *bootstrappedRaft + prt http.RoundTripper + ss *snap.Snapshotter + raftTransport *rafthttp.Transport } func (s *bootstrappedServer) Close() { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 452100d7f5b..f1d9fb72c0a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -418,36 +418,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // the hook being called during the initialization process. srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook()) - // TODO: move transport initialization near the definition of remote - tr := &rafthttp.Transport{ - Logger: cfg.Logger, - TLSInfo: cfg.PeerTLSInfo, - DialTimeout: cfg.PeerDialTimeout(), - ID: b.cluster.nodeID, - URLs: cfg.PeerURLs, - ClusterID: b.cluster.cl.ID(), - Raft: srv, - Snapshotter: b.ss, - ServerStats: sstats, - LeaderStats: lstats, - ErrorC: srv.errorc, - } - if err = tr.Start(); err != nil { - return nil, err - } - // add all remotes into transport - for _, m := range b.cluster.remotes { - if m.ID != b.cluster.nodeID { - tr.AddRemote(m.ID, m.PeerURLs) - } - } - for _, m := range b.cluster.cl.Members() { - if m.ID != b.cluster.nodeID { - tr.AddPeer(m.ID, m.PeerURLs) - } - } + tr := b.raftTransport + tr.Raft = srv srv.r.transport = tr - + tr.ErrorC = srv.errorc return srv, nil }