Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions integration/runner/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/hyperledger/fabric-x-common/internaltools/configtxgen"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/hyperledger/fabric-x-committer/api/protoblocktx"
Expand Down Expand Up @@ -223,15 +222,15 @@ func NewRuntime(t *testing.T, conf *Config) *CommitterRuntime {

t.Log("Create clients")
c.CoordinatorClient = protocoordinatorservice.NewCoordinatorClient(
clientConnWithTLS(t, s.Endpoints.Coordinator.Server, c.SystemConfig.ClientTLS),
test.NewSecuredConnection(t, s.Endpoints.Coordinator.Server, c.SystemConfig.ClientTLS),
)

c.QueryServiceClient = protoqueryservice.NewQueryServiceClient(
clientConnWithTLS(t, s.Endpoints.Query.Server, c.SystemConfig.ClientTLS),
test.NewSecuredConnection(t, s.Endpoints.Query.Server, c.SystemConfig.ClientTLS),
)

c.notifyClient = protonotify.NewNotifierClient(
clientConnWithTLS(t, s.Endpoints.Sidecar.Server, c.SystemConfig.ClientTLS),
test.NewSecuredConnection(t, s.Endpoints.Sidecar.Server, c.SystemConfig.ClientTLS),
)

c.ordererStream, err = test.NewBroadcastStream(t.Context(), &ordererconn.Config{
Expand Down Expand Up @@ -357,14 +356,6 @@ func (c *CommitterRuntime) startBlockDelivery(t *testing.T) {
})
}

// clientConnWithTLS creates a service connection using its given server endpoint and TLS configuration.
func clientConnWithTLS(t *testing.T, e *connection.Endpoint, tlsConfig connection.TLSConfig) *grpc.ClientConn {
t.Helper()
serviceConnection, err := connection.Connect(test.NewSecuredDialConfig(t, e, tlsConfig))
require.NoError(t, err)
return serviceConnection
}

// AddOrUpdateNamespaces adds policies for namespaces. If already exists, the policy will be updated.
func (c *CommitterRuntime) AddOrUpdateNamespaces(t *testing.T, namespaces ...string) {
t.Helper()
Expand Down
8 changes: 2 additions & 6 deletions loadgen/adapters/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@ func NewCoordinatorAdapter(config *connection.ClientConfig, res *ClientResources

// RunWorkload applies load on the coordinator.
func (c *CoordinatorAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
coordinatorDialConfig, err := connection.NewSingleDialConfig(c.config)
if err != nil {
return errors.Wrapf(err, "failed creating coordinator dial config")
}
// connecting to the coordinator.
conn, connErr := connection.Connect(coordinatorDialConfig)
conn, connErr := connection.NewSingleConnection(c.config)
if connErr != nil {
return errors.Wrapf(err, "failed to connect to coordinator at %s", c.config.Endpoint.Address())
return errors.Wrapf(connErr, "failed to connect to coordinator at %s", c.config.Endpoint.Address())
}
defer connection.CloseConnectionsLog(conn)
client := protocoordinatorservice.NewCoordinatorClient(conn)
Expand Down
6 changes: 1 addition & 5 deletions loadgen/adapters/loadgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ func NewLoadGenAdapter(config *connection.ClientConfig, res *ClientResources) *L

// RunWorkload applies load on the SV.
func (c *LoadGenAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
loadgenDialConfig, err := connection.NewSingleDialConfig(c.config)
if err != nil {
return errors.Wrapf(err, "failed creating loadgen dial config")
}
conn, err := connection.Connect(loadgenDialConfig)
conn, err := connection.NewSingleConnection(c.config)
if err != nil {
return errors.Wrapf(err, "failed to connect to %s", c.config.Endpoint)
}
Expand Down
2 changes: 1 addition & 1 deletion loadgen/adapters/sigverifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *SvAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWi
if err != nil {
return err
}
connections, err := connection.OpenConnections(*c.config)
connections, err := connection.NewConnectionPerEndpoint(c.config)
if err != nil {
return err
}
Expand Down
9 changes: 3 additions & 6 deletions loadgen/adapters/vcservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,11 @@ func NewVCAdapter(config *connection.MultiClientConfig, res *ClientResources) *V

// RunWorkload applies load on the VC.
func (c *VcAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
commonDial, dialErr := connection.NewLoadBalancedDialConfig(*c.config)
if dialErr != nil {
return errors.Wrapf(dialErr, "could not create dial config for vcs")
}
commonConn, connErr := connection.Connect(commonDial)
commonConn, connErr := connection.NewLoadBalancedConnection(c.config)
if connErr != nil {
return errors.Wrapf(connErr, "failed to create connection to validator persisters")
}
defer connection.CloseConnectionsLog(commonConn)
commonClient := protovcservice.NewValidationAndCommitServiceClient(commonConn)
_, setupError := commonClient.SetupSystemTablesAndNamespaces(ctx, nil)
if setupError != nil {
Expand All @@ -57,7 +54,7 @@ func (c *VcAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWi
} else {
c.nextBlockNum.Store(0)
}
connections, connErr := connection.OpenConnections(*c.config)
connections, connErr := connection.NewConnectionPerEndpoint(c.config)
if connErr != nil {
return connErr
}
Expand Down
18 changes: 6 additions & 12 deletions service/coordinator/signature_verifier_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,20 @@ func (svm *signatureVerifierManager) run(ctx context.Context) error {
return nil
})

dialConfigs, dialErr := connection.NewDialConfigPerEndpoint(c.clientConfig)
if dialErr != nil {
return dialErr
connections, connErr := connection.NewConnectionPerEndpoint(c.clientConfig)
if connErr != nil {
return fmt.Errorf("failed to create connection to signature verifier: %w", connErr)
}
for i, d := range dialConfigs {
conn, err := connection.Connect(d)
if err != nil {
return fmt.Errorf("failed to create connection to signature verifier [%d] at %s: %w",
i, d.Address, err)
}
logger.Infof("connected to signature verifier [%d] at %s", i, d.Address)
defer connection.CloseConnectionsLog(connections...)
for i, conn := range connections {
label := conn.CanonicalTarget()
c.metrics.verifiersConnection.Disconnected(label)

sv := newSignatureVerifier(c, conn)
svm.signVerifier[i] = sv
logger.Debugf("Client [%d] successfully created and connected to sv", i)
logger.Infof("Client [%d] successfully created and connected to sv at %s", i, label)

g.Go(func() error {
defer connection.CloseConnectionsLog(conn)
// error should never occur unless there is a bug or malicious activity. Hence, it is fine to crash for now.
return connection.Sustain(eCtx, func() error {
defer sv.recoverPendingTransactions(txBatchQueue)
Expand Down
34 changes: 12 additions & 22 deletions service/coordinator/validator_committer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,38 +93,31 @@ func (vcm *validatorCommitterManager) run(ctx context.Context) error {
return nil
})

commonDial, dialErr := connection.NewLoadBalancedDialConfig(*c.clientConfig)
if dialErr != nil {
return fmt.Errorf("failed to create connection to validator persisters: %w", dialErr)
}
commonConn, err := connection.Connect(commonDial)
commonConn, err := connection.NewLoadBalancedConnection(c.clientConfig)
if err != nil {
return fmt.Errorf("failed to create connection to validator persisters: %w", err)
}
defer connection.CloseConnectionsLog(commonConn)
vcm.commonClient = protovcservice.NewValidationAndCommitServiceClient(commonConn)
_, setupErr := vcm.commonClient.SetupSystemTablesAndNamespaces(ctx, nil)
if setupErr != nil {
return errors.Wrap(setupErr, "failed to setup system tables and namespaces")
}

dialConfigs, dialErr := connection.NewDialConfigPerEndpoint(c.clientConfig)
if dialErr != nil {
return dialErr
connections, connErr := connection.NewConnectionPerEndpoint(c.clientConfig)
if connErr != nil {
return fmt.Errorf("failed to create connection to validator persister: %w", connErr)
}
for i, d := range dialConfigs {
logger.Debugf("vc manager creates client to vc [%d] listening on %s", i, d.Address)
conn, connErr := connection.Connect(d)
if connErr != nil {
return fmt.Errorf("failed to create connection to validator persister running at %s", d.Address)
}
logger.Infof("validator persister manager connected to validator persister at %s", d.Address)
vc := newValidatorCommitter(conn, c.metrics, c.policyMgr)
defer connection.CloseConnectionsLog(connections...)
for i, conn := range connections {
label := conn.CanonicalTarget()
c.metrics.vcservicesConnection.Disconnected(label)

logger.Debugf("Client [%d] successfully created and connected to vc", i)
vc := newValidatorCommitter(conn, c.metrics, c.policyMgr)
vcm.validatorCommitter[i] = vc
logger.Infof("Client [%d] successfully created and connected to vc at %s", i, label)

g.Go(func() error {
defer connection.CloseConnectionsLog(vc.conn)
return connection.Sustain(eCtx, func() (err error) {
defer vc.recoverPendingTransactions(txBatchQueue)
return vc.sendTransactionsAndForwardStatus(
Expand Down Expand Up @@ -198,12 +191,9 @@ func (vcm *validatorCommitterManager) recoverPolicyManagerFromStateDB(ctx contex
}

func newValidatorCommitter(conn *grpc.ClientConn, metrics *perfMetrics, policyMgr *policyManager) *validatorCommitter {
label := conn.CanonicalTarget()
metrics.vcservicesConnection.Disconnected(label)
client := protovcservice.NewValidationAndCommitServiceClient(conn)
return &validatorCommitter{
conn: conn,
client: client,
client: protovcservice.NewValidationAndCommitServiceClient(conn),
metrics: metrics,
policyMgr: policyMgr,
}
Expand Down
3 changes: 1 addition & 2 deletions service/sidecar/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ func TestNotifierStream(t *testing.T) {
protonotify.RegisterNotifierServer(server, env.n)
})
endpoint := &config.Endpoint
conn, err := connection.Connect(test.NewInsecureDialConfig(endpoint))
require.NoError(t, err)
conn := test.NewInsecureConnection(t, endpoint)
client := protonotify.NewNotifierClient(conn)

stream, err := client.OpenNotificationStream(t.Context())
Expand Down
5 changes: 1 addition & 4 deletions service/sidecar/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/hyperledger/fabric-protos-go-apiv2/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hyperledger/fabric-x-committer/api/protoblocktx"
Expand Down Expand Up @@ -52,9 +51,7 @@ func newRelayTestEnv(t *testing.T) *relayTestEnv {
metrics,
)

conn, err := connection.Connect(test.NewInsecureDialConfig(&coordinatorEndpoint))
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, conn.Close()) })
conn := test.NewInsecureConnection(t, &coordinatorEndpoint)

logger.Infof("sidecar connected to coordinator at %s", &coordinatorEndpoint)

Expand Down
6 changes: 1 addition & 5 deletions service/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ func (s *Service) Run(ctx context.Context) error {
}()

logger.Infof("Create coordinator client and connect to %s", s.config.Committer.Endpoint)
committerDialConfig, err := connection.NewSingleDialConfig(s.config.Committer)
if err != nil {
return errors.Wrapf(err, "could not load coordinator dial config")
}
conn, connErr := connection.Connect(committerDialConfig)
conn, connErr := connection.NewSingleConnection(s.config.Committer)
if connErr != nil {
return errors.Wrapf(connErr, "failed to connect to coordinator")
}
Expand Down
8 changes: 3 additions & 5 deletions service/sidecar/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func (env *sidecarTestEnv) startNotificationStream(
sidecarClientCreds connection.TLSConfig,
) {
t.Helper()
conn, err := connection.Connect(test.NewSecuredDialConfig(t, &env.config.Server.Endpoint, sidecarClientCreds))
require.NoError(t, err)
conn := test.NewSecuredConnection(t, &env.config.Server.Endpoint, sidecarClientCreds)
var err error
env.notifyStream, err = protonotify.NewNotifierClient(conn).OpenNotificationStream(ctx)
require.NoError(t, err)
}
Expand Down Expand Up @@ -537,9 +537,7 @@ func TestSidecarVerifyBadTxForm(t *testing.T) {

func (env *sidecarTestEnv) getCoordinatorLabel(t *testing.T) string {
t.Helper()
dialConfig, err := connection.NewSingleDialConfig(env.config.Committer)
require.NoError(t, err)
conn, err := connection.Connect(dialConfig)
conn, err := connection.NewSingleConnection(env.config.Committer)
require.NoError(t, err)
require.NoError(t, conn.Close())
return conn.CanonicalTarget()
Expand Down
3 changes: 1 addition & 2 deletions service/vc/validator_committer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func newValidatorAndCommitServiceTestEnvWithClient(
for i, c := range vcs.Configs {
allEndpoints[i] = &c.Server.Endpoint
}
commonConn, connErr := connection.Connect(test.NewInsecureLoadBalancedDialConfig(t, allEndpoints))
require.NoError(t, connErr)
commonConn := test.NewInsecureLoadBalancedConnection(t, allEndpoints)

vcsTestEnv := &validatorAndCommitterServiceTestEnvWithClient{
vcs: vcs.VCServices,
Expand Down
Loading