Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/config/cobra_test_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func StartDefaultSystem(t *testing.T) SystemConfig {
_, coordinator := mock.StartMockCoordinatorService(t)
conn := dbtest.PrepareTestEnv(t)
server := connection.NewLocalHostServerWithTLS(test.InsecureTLSConfig)
listen, err := server.Listener()
listen, err := server.Listener(t.Context())
require.NoError(t, err)
connection.CloseConnectionsLog(listen)
return SystemConfig{
Expand Down
2 changes: 1 addition & 1 deletion integration/runner/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (p *portAllocator) allocatePorts(t *testing.T, count int) []config.ServiceE
func (p *portAllocator) allocate(t *testing.T) *connection.Endpoint {
t.Helper()
s := connection.NewLocalHostServerWithTLS(test.InsecureTLSConfig)
listener, err := s.Listener()
listener, err := s.Listener(t.Context())
require.NoError(t, err)
p.listeners = append(p.listeners, listener)
return &s.Endpoint
Expand Down
2 changes: 1 addition & 1 deletion utils/connection/client_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestGRPCRetryMultiEndpoints(t *testing.T) {

t.Log("Creating fake service address")
fakeServerConfig := connection.NewLocalHostServerWithTLS(test.InsecureTLSConfig)
l, err := fakeServerConfig.Listener()
l, err := fakeServerConfig.Listener(t.Context())
require.NoError(t, err)
t.Cleanup(func() {
connection.CloseConnectionsLog(l)
Expand Down
26 changes: 22 additions & 4 deletions utils/connection/server_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ type (
}
)

var listenRetry = RetryProfile{
InitialInterval: 50 * time.Millisecond,
MaxInterval: 500 * time.Millisecond,
MaxElapsedTime: 2 * time.Minute,
}

// NewLocalHostServerWithTLS returns a default server config with endpoint "localhost:0" given server credentials.
func NewLocalHostServerWithTLS(creds TLSConfig) *ServerConfig {
return &ServerConfig{
Expand Down Expand Up @@ -70,11 +76,23 @@ func (c *ServerConfig) GrpcServer() (*grpc.Server, error) {
}

// Listener instantiate a [net.Listener] and updates the config port with the effective port.
func (c *ServerConfig) Listener() (net.Listener, error) {
// If the port is predefined, it will retry to bind to the port until successful or until the context ends.
func (c *ServerConfig) Listener(ctx context.Context) (net.Listener, error) {
if c.preAllocatedListener != nil {
return c.preAllocatedListener, nil
}
listener, err := net.Listen(grpcProtocol, c.Endpoint.Address())

var err error
var listener net.Listener
if c.Endpoint.Port == 0 {
listener, err = net.Listen(grpcProtocol, c.Endpoint.Address())
} else {
err = listenRetry.Execute(ctx, func() error {
var listenErr error
listener, listenErr = net.Listen(grpcProtocol, c.Endpoint.Address())
return listenErr
})
}
if err != nil {
return nil, errors.Wrap(err, "failed to listen")
}
Expand All @@ -93,7 +111,7 @@ func (c *ServerConfig) Listener() (net.Listener, error) {
// PreAllocateListener is used to allocate a port and bind to ahead of the server initialization.
// It stores the listener object internally to be reused on subsequent calls to Listener().
func (c *ServerConfig) PreAllocateListener() (net.Listener, error) {
listener, err := c.Listener()
listener, err := c.Listener(context.Background())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to pass the ctx as an argument to this PreallocateListener() in the future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. PreallocateListener should only be used with port=0. If such attempt fails, there is no reason to retry.

if err != nil {
return nil, err
}
Expand All @@ -107,7 +125,7 @@ func RunGrpcServer(
serverConfig *ServerConfig,
register func(server *grpc.Server),
) error {
listener, err := serverConfig.Listener()
listener, err := serverConfig.Listener(ctx)
if err != nil {
return err
}
Expand Down
28 changes: 18 additions & 10 deletions utils/deliver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package deliver

import (
"context"
"net"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestBroadcastDeliver(t *testing.T) {
require.NoError(t, err)
t.Cleanup(client.CloseConnections)

ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
t.Cleanup(cancel)

outputBlocksChan := make(chan *common.Block, 100)
Expand All @@ -77,20 +78,21 @@ func TestBroadcastDeliver(t *testing.T) {

t.Log("One server down")
servers[2].Servers[0].Stop()
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[0].Endpoint)
listener1 := holdPort(ctx, t, servers[2].Configs[0])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, we only checked whether the server was down or not. Now, we create another fake listener and explicitly close it. Can you please explain the reason as I might be missing the core issue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the following comment to holdPort to clarify its use case

// holdPort attempts to bind to the specified server port and holds it until the listener is closed.
// It serves two purposes:
//  1. A successful bind indicates the port is free, meaning the server previously using it is down.
//  2. It prevents other tests from binding to the same port, ensuring this test correctly detects the server as
//     unavailable.

submit(t, &conf, outputBlocks, expectedSubmit{
success: 3,
})

t.Log("Two servers down")
servers[2].Servers[1].Stop()
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[1].Endpoint)
listener2 := holdPort(ctx, t, servers[2].Configs[1])
submit(t, &conf, outputBlocks, expectedSubmit{
success: 2,
unavailable: 1,
})

t.Log("One incorrect server")
_ = listener1.Close()
fakeServer := test.RunGrpcServerForTest(ctx, t, servers[2].Configs[0], nil)
waitUntilGrpcServerIsReady(ctx, t, &servers[2].Configs[0].Endpoint)
submit(t, &conf, outputBlocks, expectedSubmit{
Expand All @@ -100,22 +102,22 @@ func TestBroadcastDeliver(t *testing.T) {

t.Log("All good again")
fakeServer.Stop()
waitUntilGrpcServerIsDown(ctx, t, &servers[2].Configs[0].Endpoint)
servers[2].Servers[0] = test.RunGrpcServerForTest(ctx, t, servers[2].Configs[0], ordererService.RegisterService)
waitUntilGrpcServerIsReady(ctx, t, &servers[2].Configs[0].Endpoint)
submit(t, &conf, outputBlocks, expectedSubmit{
success: 3,
})

t.Log("Insufficient quorum")
_ = listener2.Close()
for _, gs := range servers[:2] {
for _, s := range gs.Servers[:2] {
s.Stop()
}
}
for _, gs := range servers[:2] {
for _, c := range gs.Configs[:2] {
waitUntilGrpcServerIsDown(ctx, t, &c.Endpoint)
holdPort(ctx, t, c)
}
}
submit(t, &conf, outputBlocks, expectedSubmit{
Expand Down Expand Up @@ -231,11 +233,17 @@ func waitUntilGrpcServerIsReady(ctx context.Context, t *testing.T, endpoint *con
t.Logf("%v is ready", endpoint)
}

func waitUntilGrpcServerIsDown(ctx context.Context, t *testing.T, endpoint *connection.Endpoint) {
// holdPort attempts to bind to the specified server port and holds it until the listener is closed.
// It serves two purposes:
// 1. A successful bind indicates the port is free, meaning the server previously using it is down.
// 2. It prevents other tests from binding to the same port, ensuring this test correctly detects the server as
// unavailable.
func holdPort(ctx context.Context, t *testing.T, c *connection.ServerConfig) net.Listener {
t.Helper()
newConn, err := connection.Connect(test.NewInsecureDialConfig(endpoint))
listener, err := c.Listener(ctx)
require.NoError(t, err)
defer connection.CloseConnectionsLog(newConn)
test.WaitUntilGrpcServerIsDown(ctx, t, newConn)
t.Logf("%v is down", endpoint)
t.Cleanup(func() {
_ = listener.Close()
})
return listener
}
2 changes: 1 addition & 1 deletion utils/monitoring/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p *Provider) StartPrometheusServer(
Handler: mux,
}

l, err := serverConfig.Listener()
l, err := serverConfig.Listener(ctx)
if err != nil {
return err
}
Expand Down
21 changes: 1 addition & 20 deletions utils/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func RunGrpcServerForTest(
ctx context.Context, tb testing.TB, serverConfig *connection.ServerConfig, register func(server *grpc.Server),
) *grpc.Server {
tb.Helper()
listener, err := serverConfig.Listener()
listener, err := serverConfig.Listener(ctx)
require.NoError(tb, err)
server, err := serverConfig.GrpcServer()
require.NoError(tb, err)
Expand Down Expand Up @@ -221,25 +221,6 @@ func WaitUntilGrpcServerIsReady(
require.Equal(t, healthgrpc.HealthCheckResponse_SERVING, res.Status)
}

// WaitUntilGrpcServerIsDown uses the health check API to check a service is down.
func WaitUntilGrpcServerIsDown(
ctx context.Context,
t *testing.T,
conn grpc.ClientConnInterface,
) {
t.Helper()
if conn == nil {
return
}
healthClient := healthgrpc.NewHealthClient(conn)
require.EventuallyWithT(t, func(ct *assert.CollectT) {
checkCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := healthClient.Check(checkCtx, nil)
require.Error(ct, err)
}, time.Minute, 50*time.Millisecond)
}

// StatusRetriever provides implementation retrieve status of given transaction identifiers.
type StatusRetriever interface {
GetTransactionsStatus(context.Context, *protoblocktx.QueryStatus, ...grpc.CallOption) (
Expand Down