forked from hyperledger/fabric-x-committer
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver_util.go
More file actions
188 lines (166 loc) · 5.64 KB
/
Copy pathserver_util.go
File metadata and controls
188 lines (166 loc) · 5.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package connection
import (
"context"
"net"
"time"
"github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
)
const grpcProtocol = "tcp"
type (
// Service describes the method that are required for a service to run.
Service interface {
// Run executes the service until the context is done.
Run(ctx context.Context) error
// WaitForReady waits for the service resources to initialize.
// If the context ended before the service is ready, returns false.
WaitForReady(ctx context.Context) bool
// RegisterService registers the supported APIs for this service.
RegisterService(server *grpc.Server)
}
)
var listenRetry = RetryProfile{
InitialInterval: 50 * time.Millisecond,
MaxInterval: 500 * time.Millisecond,
MaxElapsedTime: 2 * time.Minute,
}
// NewLocalHostServerWithTLS returns a default server config with endpoint "localhost:0" given server credentials.
func NewLocalHostServerWithTLS(creds TLSConfig) *ServerConfig {
return &ServerConfig{
Endpoint: *NewLocalHost(),
TLS: creds,
}
}
// GrpcServer instantiate a [grpc.Server].
func (c *ServerConfig) GrpcServer() (*grpc.Server, error) {
opts := []grpc.ServerOption{grpc.MaxRecvMsgSize(maxMsgSize), grpc.MaxSendMsgSize(maxMsgSize)}
serverGrpcTransportCreds, err := c.TLS.ServerCredentials()
if err != nil {
return nil, errors.Wrapf(err, "failed loading the server's grpc credentials")
}
opts = append(opts, grpc.Creds(serverGrpcTransportCreds))
if c.KeepAlive != nil && c.KeepAlive.Params != nil {
opts = append(opts, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: c.KeepAlive.Params.MaxConnectionIdle,
MaxConnectionAge: c.KeepAlive.Params.MaxConnectionAge,
MaxConnectionAgeGrace: c.KeepAlive.Params.MaxConnectionAgeGrace,
Time: c.KeepAlive.Params.Time,
Timeout: c.KeepAlive.Params.Timeout,
}))
}
if c.KeepAlive != nil && c.KeepAlive.EnforcementPolicy != nil {
opts = append(opts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: c.KeepAlive.EnforcementPolicy.MinTime,
PermitWithoutStream: c.KeepAlive.EnforcementPolicy.PermitWithoutStream,
}))
}
return grpc.NewServer(opts...), nil
}
// Listener instantiate a [net.Listener] and updates the config port with the effective port.
// If the port is predefined, it will retry to bind to the port until successful or until the context ends.
func (c *ServerConfig) Listener(ctx context.Context) (net.Listener, error) {
if c.preAllocatedListener != nil {
return c.preAllocatedListener, nil
}
var err error
var listener net.Listener
if c.Endpoint.Port == 0 {
listener, err = net.Listen(grpcProtocol, c.Endpoint.Address())
} else {
err = listenRetry.Execute(ctx, func() error {
var listenErr error
listener, listenErr = net.Listen(grpcProtocol, c.Endpoint.Address())
return listenErr
})
}
if err != nil {
return nil, errors.Wrap(err, "failed to listen")
}
addr := listener.Addr()
tcpAddress, ok := addr.(*net.TCPAddr)
if !ok {
return nil, errors.Join(errors.New("failed to cast to TCP address"), listener.Close())
}
c.Endpoint.Port = tcpAddress.Port
logger.Infof("Listening on: %s://%s", grpcProtocol, c.Endpoint.String())
return listener, nil
}
// PreAllocateListener is used to allocate a port and bind to ahead of the server initialization.
// It stores the listener object internally to be reused on subsequent calls to Listener().
func (c *ServerConfig) PreAllocateListener() (net.Listener, error) {
listener, err := c.Listener(context.Background())
if err != nil {
return nil, err
}
c.preAllocatedListener = listener
return listener, nil
}
// RunGrpcServer runs a server and returns error if failed.
func RunGrpcServer(
ctx context.Context,
serverConfig *ServerConfig,
register func(server *grpc.Server),
) error {
listener, err := serverConfig.Listener(ctx)
if err != nil {
return err
}
server, err := serverConfig.GrpcServer()
if err != nil {
return errors.Wrapf(err, "failed creating grpc server")
}
register(server)
g, gCtx := errgroup.WithContext(ctx)
logger.Infof("Serving...")
g.Go(func() error {
return server.Serve(listener)
})
<-gCtx.Done()
server.Stop()
return g.Wait()
}
// StartService runs a service, waits until it is ready, and register the gRPC server(s).
// It will stop if either the service ended or its respective gRPC server.
func StartService(
ctx context.Context,
service Service,
serverConfigs ...*ServerConfig,
) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
// If the service stops, there is no reason to continue the GRPC server.
defer cancel()
return service.Run(gCtx)
})
ctxTimeout, cancelTimeout := context.WithTimeout(gCtx, 5*time.Minute) // TODO: make this configurable.
defer cancelTimeout()
if !service.WaitForReady(ctxTimeout) {
cancel()
return errors.Wrapf(g.Wait(), "service is not ready")
}
for _, server := range serverConfigs {
server := server
g.Go(func() error {
// If the GRPC servers stop, there is no reason to continue the service.
defer cancel()
return RunGrpcServer(gCtx, server, service.RegisterService)
})
}
return g.Wait()
}
// DefaultHealthCheckService returns a health-check service that returns SERVING for all services.
func DefaultHealthCheckService() *health.Server {
healthcheck := health.NewServer()
healthcheck.SetServingStatus("", healthgrpc.HealthCheckResponse_SERVING)
return healthcheck
}