Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/config/create_config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var (
//go:embed templates/signatureverifier.yaml
TemplateVerifier string

TemplateLoadGenOnlyOrderer = templateLoadGenOnlyOrdererClient + templateLoadGenCommon
TemplateLoadGenOrderer = templateLoadGenOrdererClient + templateLoadGenCommon
TemplateLoadGenCommitter = templateLoadGenCommitterClient + templateLoadGenCommon
TemplateLoadGenCoordinator = templateLoadGenCoordinatorClient + templateLoadGenCommon
Expand All @@ -103,6 +104,8 @@ var (

//go:embed templates/loadgen_common.yaml
templateLoadGenCommon string
//go:embed templates/loadgen_client_only_orderer.yaml
templateLoadGenOnlyOrdererClient string
//go:embed templates/loadgen_client_orderer.yaml
templateLoadGenOrdererClient string
//go:embed templates/loadgen_client_sidecar.yaml
Expand Down
17 changes: 17 additions & 0 deletions cmd/config/templates/loadgen_client_only_orderer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright IBM Corp. All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
# This is a partial template. It contains only the orderer client configurations.
# It should be complimented by the common load generator configuration.

orderer-client:
orderer:
connection:
endpoints:
{{- range .Endpoints.Orderer }}
- {{ .Server }}
{{- end }}
consensus-type: BFT
channel-id: {{ .ChannelID }}
broadcast-parallelism: 5
2 changes: 1 addition & 1 deletion cmd/config/templates/loadgen_client_orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ orderer-client:
{{- end }}
consensus-type: BFT
channel-id: {{ .ChannelID }}
broadcast-parallelism: 50
broadcast-parallelism: 5
10 changes: 7 additions & 3 deletions integration/runner/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ const (
Verifier
VC
QueryService

LoadGenForOnlyOrderer
LoadGenForOrderer
LoadGenForCommitter
LoadGenForCoordinator
Expand All @@ -120,7 +122,7 @@ const (
CommitterTxPathWithLoadGen = CommitterTxPath | LoadGenForCommitter

// loadGenMatcher is used to extract only the load generator flags from the full service flags value.
loadGenMatcher = LoadGenForOrderer | LoadGenForCommitter | LoadGenForCoordinator |
loadGenMatcher = LoadGenForOnlyOrderer | LoadGenForOrderer | LoadGenForCommitter | LoadGenForCoordinator |
LoadGenForVCService | LoadGenForVerifier
)

Expand Down Expand Up @@ -272,10 +274,12 @@ func (c *CommitterRuntime) startLoadGen(t *testing.T, serviceFlags int) {
require.Falsef(t, isMoreThanOneBitSet(loadGenFlag), "only one load generator may be set")
loadGenParams := cmdLoadGen
switch loadGenFlag {
case LoadGenForCommitter:
loadGenParams.Template = config.TemplateLoadGenCommitter
case LoadGenForOnlyOrderer:
loadGenParams.Template = config.TemplateLoadGenOnlyOrderer
case LoadGenForOrderer:
loadGenParams.Template = config.TemplateLoadGenOrderer
case LoadGenForCommitter:
loadGenParams.Template = config.TemplateLoadGenCommitter
case LoadGenForCoordinator:
loadGenParams.Template = config.TemplateLoadGenCoordinator
case LoadGenForVCService:
Expand Down
6 changes: 5 additions & 1 deletion integration/test/loadgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ func TestLoadGen(t *testing.T) {
serviceFlags int
}{
{
name: "orderer",
name: "orderer with committer",
serviceFlags: runner.FullTxPathWithLoadGen,
},
{
name: "only orderer",
serviceFlags: runner.LoadGenForOnlyOrderer | runner.Orderer,
},
{
name: "committer",
serviceFlags: runner.CommitterTxPathWithLoadGen,
Expand Down
4 changes: 3 additions & 1 deletion loadgen/adapters/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ type (
// OrdererClientConfig is a struct that contains the configuration for the orderer client.
OrdererClientConfig struct {
Orderer broadcastdeliver.Config `mapstructure:"orderer"`
SidecarEndpoint *connection.Endpoint `mapstructure:"sidecar-endpoint"`
BroadcastParallelism int `mapstructure:"broadcast-parallelism"`
// SidecarEndpoint is used to deliver status from the sidecar.
// If omitted, we will fetch directly from the orderer.
SidecarEndpoint *connection.Endpoint `mapstructure:"sidecar-endpoint"`
}

// SidecarClientConfig is a struct that contains the configuration for the sidecar client.
Expand Down
84 changes: 48 additions & 36 deletions loadgen/adapters/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"context"

"github.com/cockroachdb/errors"
"github.com/hyperledger/fabric-protos-go-apiv2/common"
"golang.org/x/sync/errgroup"

"github.com/hyperledger/fabric-x-committer/api/protoblocktx"
"github.com/hyperledger/fabric-x-committer/loadgen/workload"
"github.com/hyperledger/fabric-x-committer/utils/broadcastdeliver"
)
Expand All @@ -34,39 +36,46 @@ func NewOrdererAdapter(config *OrdererClientConfig, res *ClientResources) *Order

// RunWorkload applies load on the sidecar.
func (c *OrdererAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
broadcastSubmitter, err := broadcastdeliver.New(&c.config.Orderer)
client, err := broadcastdeliver.New(&c.config.Orderer)
if err != nil {
return errors.Wrap(err, "failed to create orderer clients")
}
defer broadcastSubmitter.Close()
defer client.Close()

dCtx, dCancel := context.WithCancel(ctx)
defer dCancel()
g, gCtx := errgroup.WithContext(dCtx)
g.Go(func() error {
defer dCancel() // We stop sending if we can't track the received items.
return runReceiver(gCtx, &receiverConfig{
ChannelID: c.config.Orderer.ChannelID,
Endpoint: c.config.SidecarEndpoint,
Res: c.res,
if c.config.SidecarEndpoint == nil || c.config.SidecarEndpoint.Empty() {
g.Go(func() error {
defer dCancel() // We stop sending if we can't track the received items.
return runOrdererReceiver(gCtx, c.res, client)
})
} else {
g.Go(func() error {
defer dCancel() // We stop sending if we can't track the received items.
return runSidecarReceiver(gCtx, &sidecarReceiverConfig{
ChannelID: c.config.Orderer.ChannelID,
Endpoint: c.config.SidecarEndpoint,
Res: c.res,
})
})
})
}

for range c.config.BroadcastParallelism {
g.Go(func() error {
stream, err := broadcastSubmitter.Broadcast(gCtx)
stream, err := client.Broadcast(gCtx)
if err != nil {
return errors.Wrap(err, "failed to create a broadcast stream")
}
g.Go(func() error {
err := c.sendTransactions(gCtx, txStream, stream)
// Insufficient quorum may happen when the context ends due to unavailable servers.
if ctx.Err() != nil {
return ctx.Err()
}
return err
})
return nil
return sendBlocks(
gCtx, &c.commonAdapter, txStream,
func(txs []*protoblocktx.Tx) ([]*common.Envelope, []string, error) {
return mapToBatch(txs, stream)
},
func(envs []*common.Envelope) error {
return sendBatch(envs, stream)
},
)
})
}
return errors.Wrap(g.Wait(), "workload done")
Expand All @@ -83,25 +92,28 @@ func (*OrdererAdapter) Supports() Phases {
}
}

// sendTransactions submits Fabric TXs. It uses the envelope's TX ID to track the TXs latency.
func (c *OrdererAdapter) sendTransactions(
ctx context.Context, txStream *workload.StreamWithSetup, stream *broadcastdeliver.EnvelopedStream,
) error {
txGen := txStream.MakeTxGenerator()
for ctx.Err() == nil {
tx := txGen.Next(ctx, 1)
if len(tx) == 0 {
// The context ended.
return nil
}
txID, err := stream.SendWithEnv(tx[0])
// mapToBatch creates a batch of orderer envelopes. It uses the envelope header ID to track the TXs latency.
func mapToBatch(
txs []*protoblocktx.Tx, stream *broadcastdeliver.EnvelopedStream,
) ([]*common.Envelope, []string, error) {
envs := make([]*common.Envelope, len(txs))
txIDs := make([]string, len(txs))
for i, tx := range txs {
var err error
envs[i], txIDs[i], err = stream.CreateEnvelope(tx)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an issue to parallelize this. This change may make it hard to achieve that.

if err != nil {
return errors.Wrap(err, "failed to submit transaction")
return nil, nil, err
}
logger.Debugf("Sent TX %s", txID)
c.res.Metrics.OnSendTransaction(txID)
if c.res.isTXSendLimit() {
return nil
}
return envs, txIDs, nil
}

// sendBatch sends a batch one by one.
func sendBatch(envelopes []*common.Envelope, stream *broadcastdeliver.EnvelopedStream) error {
for _, env := range envelopes {
err := stream.Send(env)
if err != nil {
return err
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion loadgen/adapters/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *SidecarAdapter) RunWorkload(ctx context.Context, txStream *workload.Str

g.Go(func() error {
defer dCancel() // We stop sending if we can't track the received items.
return runReceiver(gCtx, &receiverConfig{
return runSidecarReceiver(gCtx, &sidecarReceiverConfig{
ChannelID: c.config.ChannelID,
Endpoint: c.config.SidecarEndpoint,
Res: c.res,
Expand Down
99 changes: 67 additions & 32 deletions loadgen/adapters/sidecar_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,56 @@ import (
"github.com/hyperledger/fabric-x-committer/utils/serialization"
)

type receiverConfig struct {
type sidecarReceiverConfig struct {
Endpoint *connection.Endpoint
ChannelID string
Res *ClientResources
}

const committedBlocksQueueSize = 1024
const statusIdx = int(common.BlockMetadataIndex_TRANSACTIONS_FILTER)

// runReceiver start receiving blocks from the sidecar.
func runReceiver(ctx context.Context, config *receiverConfig) error {
// runSidecarReceiver start receiving blocks from the sidecar.
func runSidecarReceiver(ctx context.Context, config *sidecarReceiverConfig) error {
ledgerReceiver, err := sidecarclient.New(&sidecarclient.Config{
ChannelID: config.ChannelID,
Endpoint: config.Endpoint,
})
if err != nil {
return errors.Wrap(err, "failed to create ledger receiver")
}

g, gCtx := errgroup.WithContext(ctx)
committedBlock := make(chan *common.Block, committedBlocksQueueSize)
g.Go(func() error {
return runDeliveryReceiver(ctx, config.Res, func(gCtx context.Context, committedBlock chan *common.Block) error {
return ledgerReceiver.Deliver(gCtx, &sidecarclient.DeliverConfig{
EndBlkNum: broadcastdeliver.MaxBlockNum,
OutputBlock: committedBlock,
})
})
}

// runOrdererReceiver start receiving blocks from the orderer.
func runOrdererReceiver(ctx context.Context, res *ClientResources, client *broadcastdeliver.Client) error {
return runDeliveryReceiver(ctx, res, func(gCtx context.Context, committedBlock chan *common.Block) error {
return client.Deliver(gCtx, &broadcastdeliver.DeliverConfig{
EndBlkNum: broadcastdeliver.MaxBlockNum,
OutputBlock: committedBlock,
})
})
}

// runDeliveryReceiver start receiving blocks from a delivery service.
func runDeliveryReceiver(
ctx context.Context, res *ClientResources, deliver func(context.Context, chan *common.Block) error,
) error {
g, gCtx := errgroup.WithContext(ctx)
committedBlock := make(chan *common.Block, committedBlocksQueueSize)
g.Go(func() error {
receiveCommittedBlock(gCtx, committedBlock, config.Res)
return deliver(gCtx, committedBlock)
})
g.Go(func() error {
receiveCommittedBlock(gCtx, committedBlock, res)
return context.Canceled
})
return errors.Wrap(g.Wait(), "sidecar receiver done")
return errors.Wrap(g.Wait(), "receiver done")
Comment thread
liran-funaro marked this conversation as resolved.
}

func receiveCommittedBlock(
Expand All @@ -74,29 +93,7 @@ func receiveCommittedBlock(
if !ok {
return
}

statusCodes := block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]
logger.Infof("Received block #%d with %d TXs and %d statuses [%s]",
block.Header.Number, len(block.Data.Data), len(statusCodes), recapStatusCodes(statusCodes),
)

statusBatch := make([]metrics.TxStatus, 0, len(block.Data.Data))
for i, data := range block.Data.Data {
_, channelHeader, err := serialization.UnwrapEnvelope(data)
if err != nil {
logger.Warnf("Failed to unmarshal envelope: %v", err)
continue
}
if common.HeaderType(channelHeader.Type) == common.HeaderType_CONFIG {
// We can ignore config transactions as we only count data transactions.
continue
}
statusBatch = append(statusBatch, metrics.TxStatus{
TxID: channelHeader.TxId,
Status: protoblocktx.Status(statusCodes[i]),
})
}
processedBlocks.Write(statusBatch)
processedBlocks.Write(mapToStatusBatch(block))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use multiple goroutines to process the committed status as we are unmarshaling the envelope. Otherwise, this could increase the latency unnecessarily.

}
}()

Expand All @@ -112,6 +109,44 @@ func receiveCommittedBlock(
}
}

// mapToStatusBatch creates a status batch from a given block.
func mapToStatusBatch(block *common.Block) []metrics.TxStatus {
if block.Data == nil || len(block.Data.Data) == 0 {
return nil
}
blockSize := len(block.Data.Data)

var statusCodes []byte
if block.Metadata != nil && len(block.Metadata.Metadata) > statusIdx {
statusCodes = block.Metadata.Metadata[statusIdx]
}
logger.Infof("Received block #%d with %d TXs and %d statuses [%s]",
block.Header.Number, len(block.Data.Data), len(statusCodes), recapStatusCodes(statusCodes),
)

statusBatch := make([]metrics.TxStatus, 0, blockSize)
for i, data := range block.Data.Data {
_, channelHeader, err := serialization.UnwrapEnvelope(data)
if err != nil {
logger.Warnf("Failed to unmarshal envelope: %v", err)
continue
}
if common.HeaderType(channelHeader.Type) == common.HeaderType_CONFIG {
// We can ignore config transactions as we only count data transactions.
continue
}
status := protoblocktx.Status_COMMITTED
if len(statusCodes) > i {
status = protoblocktx.Status(statusCodes[i])
}
statusBatch = append(statusBatch, metrics.TxStatus{
TxID: channelHeader.TxId,
Status: status,
})
}
return statusBatch
}

// recapStatusCodes recaps of the status codes of a block.
func recapStatusCodes(statusCodes []byte) string {
codes := make(map[byte]uint64)
Expand Down
Loading