Skip to content

add notes for grpc #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ etcd is written in Go and uses the [Raft][] consensus algorithm to manage a high
- [server: make applier use ReadTx() in Txn() instead of ConcurrentReadTx()](https://github.com/etcd-io/etcd/pull/12896)
- [raft: postpone MsgReadIndex until first commit in the term](https://github.com/etcd-io/etcd/pull/12762)
- [etcdserver: resend ReadIndex request on empty apply request](https://github.com/etcd-io/etcd/pull/12795)
- [clientv3: Replace balancer with upstream grpc solution](https://github.com/etcd-io/etcd/pull/12671)
- [raft: implement fast log rejection](https://github.com/etcd-io/etcd/pull/11964)
- [Lease Checkpoints fix](https://github.com/etcd-io/etcd/pull/13508)
- [mvcc: fully concurrent read](https://github.com/etcd-io/etcd/pull/10523)
Expand Down
1 change: 1 addition & 0 deletions client/v3/internal/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (

// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
// using SetEndpoints.
// TODO(zhengliang): 不明白
type EtcdManualResolver struct {
*manual.Resolver
endpoints []string
Expand Down
1 change: 1 addition & 0 deletions client/v3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm
}

func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
m.lg.Info("*** maintenance endpoint", zap.String("endpoint", endpoint))
remote, cancel, err := m.dial(endpoint)
if err != nil {
return nil, toErr(ctx, err)
Expand Down
3 changes: 2 additions & 1 deletion etcdctl/ctlv3/command/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/bgentry/speakeasy"
"go.etcd.io/etcd/client/pkg/v3/srv"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/pkg/v3/flags"

Expand Down Expand Up @@ -112,6 +112,7 @@ func (*discardValue) String() string { return "" }
func (*discardValue) Set(string) error { return nil }
func (*discardValue) Type() string { return "" }

// 根据 cobra.Command 初始化 client config
func clientConfigFromCmd(cmd *cobra.Command) *clientConfig {
lg, err := zap.NewProduction()
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
"configuring peer listeners",
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
)
if e.Peers, err = configurePeerListeners(cfg); err != nil {
if e.Peers, err = configurePeerListeners(cfg); err != nil { // peers
return e, err
}

e.cfg.logger.Info(
"configuring client listeners",
zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
)
if e.sctxs, err = configureClientListeners(cfg); err != nil {
if e.sctxs, err = configureClientListeners(cfg); err != nil { //clients
return e, err
}

Expand Down Expand Up @@ -470,6 +470,7 @@ func (e *Etcd) Err() <-chan error {
return e.errc
}

// 配置 peer listeners
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
return nil, err
Expand Down Expand Up @@ -531,6 +532,7 @@ func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
}

// configure peer handlers after rafthttp.Transport started
// 服务 peers
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
var peerTLScfg *tls.Config
Expand Down Expand Up @@ -590,6 +592,7 @@ func (e *Etcd) servePeers() (err error) {
return nil
}

// 配置 client listeners
func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil {
return nil, err
Expand Down Expand Up @@ -632,6 +635,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
continue
}

// 启动 listener
if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme,
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithSkipTLSInfoCheck(true),
Expand Down Expand Up @@ -675,13 +679,17 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
}
sctx.serviceRegister = cfg.ServiceRegister
if cfg.EnablePprof || cfg.LogLevel == "debug" {
// enable pprof or debug mode
sctx.registerPprof()
}
if cfg.LogLevel == "debug" {
sctx.registerTrace()
}
sctxs[addr] = sctx
}
for _, sctx := range sctxs {
cfg.logger.Info("****LCUrls", zap.String("lcurls sctxs", fmt.Sprintf("%#v", sctx)))
}
return sctxs, nil
}

Expand Down Expand Up @@ -709,6 +717,7 @@ func (e *Etcd) serveClients() (err error) {
h = v2http.NewClientHandler(e.GetLogger(), e.Server, e.Server.Cfg.ReqTimeout())
}
} else {
// v3 请求
mux := http.NewServeMux()
etcdhttp.HandleBasic(e.cfg.logger, mux, e.Server)
etcdhttp.HandleMetricsHealthForV3(e.cfg.logger, mux, e.Server)
Expand Down
10 changes: 6 additions & 4 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func (sctx *serveCtx) serve(
sctx.lg.Info("ready to serve client requests")

m := cmux.New(sctx.l)
v3c := v3client.New(s)
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)
v3c := v3client.New(s) // 创建 client
servElection := v3election.NewElectionServer(v3c) // election
servLock := v3lock.NewLockServer(v3c) // lock

var gs *grpc.Server
defer func() {
Expand All @@ -112,7 +112,7 @@ func (sctx *serveCtx) serve(
}()

if sctx.insecure {
gs = v3rpc.Server(s, nil, nil, gopts...)
gs = v3rpc.Server(s, nil, nil, gopts...) // 创建 grpc server
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand All @@ -138,6 +138,7 @@ func (sctx *serveCtx) serve(
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()

// 同时支持 grpc server and http server
sctx.serversC <- &servers{grpc: gs, http: srvhttp}
sctx.lg.Info(
"serving client traffic insecurely; this is strongly discouraged!",
Expand Down Expand Up @@ -216,6 +217,7 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha

type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error

// 注册 grpc gateway
func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
ctx := sctx.ctx

Expand Down
1 change: 1 addition & 0 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
maxSendBytes = math.MaxInt32
)

// grpc server
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
Expand Down