Skip to content

!feat: Allow Multiple Core gRPC Connections with Load Balancing #4137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
require.NoError(t, err)
host, port, err := net.SplitHostPort(cctx.GRPCClient.Target())
require.NoError(t, err)
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
fetcher, err := newTestBlockFetcher(t, host, port)
require.NoError(t, err)
return fetcher, cctx
}
Expand Down
64 changes: 53 additions & 11 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -30,17 +31,50 @@ var (
newDataSignedBlockQuery = types.QueryForEvent(types.EventSignedBlock).String()
)

// BlockFetcher fetches blocks from core using one or more gRPC clients
type BlockFetcher struct {
client coregrpc.BlockAPIClient
// clients holds all the BlockAPIClients for each connection
clients []coregrpc.BlockAPIClient
// currentClient is the index of the currently active client
currentClient atomic.Uint32
}

// NewBlockFetcher returns a new `BlockFetcher`.
func NewBlockFetcher(conn *grpc.ClientConn) (*BlockFetcher, error) {
// NewMultiBlockFetcher returns a new `BlockFetcher` using multiple connections.
// It implements client-side load balancing using a simple round-robin mechanism.
// If only one connection is provided, it will still use the same round-robin logic
// but will only have one client to select from.
func NewMultiBlockFetcher(conns []*grpc.ClientConn) (*BlockFetcher, error) {
if len(conns) == 0 {
return nil, fmt.Errorf("at least one connection is required")
}

clients := make([]coregrpc.BlockAPIClient, len(conns))
for i, conn := range conns {
clients[i] = coregrpc.NewBlockAPIClient(conn)
}

return &BlockFetcher{
client: coregrpc.NewBlockAPIClient(conn),
clients: clients,
currentClient: atomic.Uint32{},
}, nil
}

// getNextClient returns the next client in a round-robin fashion and advances
// the internal counter. This provides simple load balancing across
// multiple Core gRPC endpoints.
func (f *BlockFetcher) getNextClient() coregrpc.BlockAPIClient {
// Get the current client
current := f.currentClient.Load()

// Calculate the next client in round-robin fashion
next := (current + 1) % uint32(len(f.clients))

// Update the current client atomically to avoid race conditions
f.currentClient.Store(next)

return f.clients[current]
}

// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet.
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) {
commit, err := f.Commit(ctx, height)
Expand All @@ -64,7 +98,8 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.C
// GetBlock queries Core for a `Block` at the given height.
// if the height is nil, use the latest height
func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock, error) {
stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height})
client := f.getNextClient()
stream, err := client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height})
if err != nil {
return nil, err
}
Expand All @@ -79,7 +114,8 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*
if hash == nil {
return nil, fmt.Errorf("cannot get block with nil hash")
}
stream, err := f.client.BlockByHash(ctx, &coregrpc.BlockByHashRequest{Hash: hash})
client := f.getNextClient()
stream, err := client.BlockByHash(ctx, &coregrpc.BlockByHashRequest{Hash: hash})
if err != nil {
return nil, err
}
Expand All @@ -94,7 +130,8 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*
// GetSignedBlock queries Core for a `Block` at the given height.
// if the height is nil, use the latest height.
func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) {
stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height})
client := f.getNextClient()
stream, err := client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height})
if err != nil {
return nil, err
}
Expand All @@ -105,7 +142,8 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe
// the given height.
// If the height is nil, use the latest height.
func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) {
res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height})
client := f.getNextClient()
res, err := client.Commit(ctx, &coregrpc.CommitRequest{Height: height})
if err != nil {
return nil, err
}
Expand All @@ -126,7 +164,8 @@ func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit,
// block at the given height.
// If the height is nil, use the latest height.
func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) {
res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height})
client := f.getNextClient()
res, err := client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,7 +196,9 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (chan types.E
default:
}

subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{})
// Get the current client for this subscription attempt
client := f.getNextClient()
subscription, err := client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{})
if err != nil {
// try re-subscribe in case of any errors that can come during subscription. gRPC
// retry mechanism has a back off on retries, so we don't need timers anymore.
Expand Down Expand Up @@ -214,7 +255,8 @@ func (f *BlockFetcher) receive(
// syncing, and false for already caught up. It can also return an error
// in the case of a failed status request.
func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) {
resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{})
client := f.getNextClient()
resp, err := client.Status(ctx, &coregrpc.StatusRequest{})
if err != nil {
return false, err
}
Expand Down
3 changes: 1 addition & 2 deletions core/fetcher_no_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func TestBlockFetcherHeaderValues(t *testing.T) {
node := StartTestNode(t)
host, port, err := net.SplitHostPort(node.GRPCClient.Target())
require.NoError(t, err)
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
fetcher, err := newTestBlockFetcher(t, host, port)
require.NoError(t, err)
newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
Expand Down
54 changes: 50 additions & 4 deletions core/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,68 @@ package core
import (
"context"
"net"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
)

// TestRoundRobinClientSelection ensures that the round-robin client selection
// mechanism works as expected, rotating through available clients.
func TestRoundRobinClientSelection(t *testing.T) {
// Create a BlockFetcher with mock clients to test the round-robin behavior
clients := []coregrpc.BlockAPIClient{
&mockBlockAPIClient{id: 1},
&mockBlockAPIClient{id: 2},
&mockBlockAPIClient{id: 3},
}

fetcher := &BlockFetcher{
clients: clients,
currentClient: atomic.Uint32{},
}

// The first call should return client 0 (id: 1)
client1 := fetcher.getNextClient()
mockClient1, ok := client1.(*mockBlockAPIClient)
require.True(t, ok, "Expected mockBlockAPIClient")
assert.Equal(t, 1, mockClient1.id)

// The second call should return client 1 (id: 2)
client2 := fetcher.getNextClient()
mockClient2, ok := client2.(*mockBlockAPIClient)
require.True(t, ok, "Expected mockBlockAPIClient")
assert.Equal(t, 2, mockClient2.id)

// The third call should return client 2 (id: 3)
client3 := fetcher.getNextClient()
mockClient3, ok := client3.(*mockBlockAPIClient)
require.True(t, ok, "Expected mockBlockAPIClient")
assert.Equal(t, 3, mockClient3.id)

// The fourth call should wrap around and return client 0 (id: 1) again
client4 := fetcher.getNextClient()
mockClient4, ok := client4.(*mockBlockAPIClient)
require.True(t, ok, "Expected mockBlockAPIClient")
assert.Equal(t, 1, mockClient4.id)
}

// mockBlockAPIClient is a mock implementation of coregrpc.BlockAPIClient for testing
type mockBlockAPIClient struct {
id int
coregrpc.BlockAPIClient
}

func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
t.Cleanup(cancel)

host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target())
require.NoError(t, err)
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
fetcher, err := newTestBlockFetcher(t, host, port)
require.NoError(t, err)
// generate some blocks
newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx)
Expand Down Expand Up @@ -51,8 +98,7 @@ func TestFetcher_Resubscription(t *testing.T) {
require.NoError(t, tn.Start())
host, port, err := net.SplitHostPort(tn.GRPCClient.Target())
require.NoError(t, err)
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
fetcher, err := newTestBlockFetcher(t, host, port)
require.NoError(t, err)

// subscribe to the channel to get new blocks
Expand Down
3 changes: 1 addition & 2 deletions core/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {

host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target())
require.NoError(t, err)
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
fetcher, err := newTestBlockFetcher(t, host, port)
require.NoError(t, err)
sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
Expand Down
19 changes: 19 additions & 0 deletions core/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package core

import (
"fmt"
"testing"

"google.golang.org/grpc"
)

// newTestBlockFetcher creates a BlockFetcher for testing using a single connection.
// This is a helper function for creating a fetcher with a single endpoint in tests.
func newTestBlockFetcher(t *testing.T, host, port string) (*BlockFetcher, error) {
client := newTestClient(t, host, port)
if client == nil {
return nil, fmt.Errorf("failed to create test client for %s:%s", host, port)
}
conn := []*grpc.ClientConn{client}
return NewMultiBlockFetcher(conn)
}
63 changes: 63 additions & 0 deletions core/testutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package core

import (
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"google.golang.org/grpc"
)

// TestNewMultiBlockFetcher tests the NewMultiBlockFetcher function
// directly to ensure it properly creates a BlockFetcher with multiple connections.
func TestNewMultiBlockFetcher(t *testing.T) {
// Create test connections
conn1 := &grpc.ClientConn{}
conn2 := &grpc.ClientConn{}
conn3 := &grpc.ClientConn{}

conns := []*grpc.ClientConn{conn1, conn2, conn3}

// Create fetcher with multiple connections
fetcher, err := NewMultiBlockFetcher(conns)
require.NoError(t, err)
require.NotNil(t, fetcher)

// Verify the clients were created
assert.Equal(t, len(conns), len(fetcher.clients))

// Test error case with empty connections
_, err = NewMultiBlockFetcher([]*grpc.ClientConn{})
assert.Error(t, err)
assert.Contains(t, err.Error(), "at least one connection is required")
}

// TestBlockFetcherRoundRobin tests the round-robin client selection mechanism
// of the BlockFetcher by creating a fetcher with custom mock clients.
func TestBlockFetcherRoundRobin(t *testing.T) {
// Create mock clients
clients := []coregrpc.BlockAPIClient{
&mockBlockAPIClient{id: 1},
&mockBlockAPIClient{id: 2},
&mockBlockAPIClient{id: 3},
}

// Create fetcher with mock clients
fetcher := &BlockFetcher{
clients: clients,
currentClient: atomic.Uint32{},
}

// Test round-robin rotation
for i := 0; i < 6; i++ {
client := fetcher.getNextClient()
mockClient, ok := client.(*mockBlockAPIClient)
require.True(t, ok, "Expected mockBlockAPIClient")

// Expected client ID should rotate through 1, 2, 3, 1, 2, 3
expectedID := (i % 3) + 1
assert.Equal(t, expectedID, mockClient.id, "Incorrect client selected at iteration %d", i)
}
}
Loading
Loading