forked from hyperledger/fabric-x-committer
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathloadgen.go
More file actions
110 lines (97 loc) · 3 KB
/
Copy pathloadgen.go
File metadata and controls
110 lines (97 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package adapters
import (
"context"
"github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
"github.com/hyperledger/fabric-x-committer/api/protoblocktx"
"github.com/hyperledger/fabric-x-committer/api/protoloadgen"
"github.com/hyperledger/fabric-x-committer/loadgen/metrics"
"github.com/hyperledger/fabric-x-committer/loadgen/workload"
"github.com/hyperledger/fabric-x-committer/utils/channel"
"github.com/hyperledger/fabric-x-committer/utils/connection"
)
type (
// LoadGenAdapter applies load on another load generator.
LoadGenAdapter struct {
commonAdapter
config *connection.ClientConfig
}
receivedBatch struct {
batch *protoloadgen.Batch
status protoblocktx.Status
}
)
// NewLoadGenAdapter instantiate LoadGenAdapter.
func NewLoadGenAdapter(config *connection.ClientConfig, res *ClientResources) *LoadGenAdapter {
return &LoadGenAdapter{
commonAdapter: commonAdapter{res: res},
config: config,
}
}
// RunWorkload applies load on the SV.
func (c *LoadGenAdapter) RunWorkload(ctx context.Context, txStream *workload.StreamWithSetup) error {
conn, err := connection.NewSingleConnection(c.config)
if err != nil {
return errors.Wrapf(err, "failed to connect to %s", c.config.Endpoint)
}
defer connection.CloseConnectionsLog(conn)
client := protoloadgen.NewLoadGenServiceClient(conn)
receiveQueue := make(chan receivedBatch, c.res.Stream.BuffersSize)
dCtx, dCancel := context.WithCancel(ctx)
defer dCancel()
g, gCtx := errgroup.WithContext(dCtx)
g.Go(func() error {
receiveQueueCtx := channel.NewWriter(gCtx, receiveQueue)
return sendBlocks(dCtx, &c.commonAdapter, txStream, workload.MapToLoadGenBatch,
func(batch *protoloadgen.Batch) error {
_, appendErr := client.AppendBatch(dCtx, batch)
status := protoblocktx.Status_COMMITTED
if appendErr != nil {
status = protoblocktx.Status_NOT_VALIDATED
}
receiveQueueCtx.Write(receivedBatch{
batch: batch,
status: status,
})
return appendErr
},
)
})
g.Go(func() error {
defer dCancel() // We stop sending if we can't track the received items.
c.receiveStatus(gCtx, receiveQueue)
return nil
})
return errors.Wrap(g.Wait(), "workload done")
}
// Supports specify which phases an adapter supports.
// The load generator supports only load generator.
// The config TX and the namespaces are generated by the main generator.
func (*LoadGenAdapter) Supports() Phases {
return Phases{
Config: false,
Namespaces: false,
Load: true,
}
}
func (c *LoadGenAdapter) receiveStatus(ctx context.Context, queue <-chan receivedBatch) {
queueCtx := channel.NewReader(ctx, queue)
for ctx.Err() == nil {
b, ok := queueCtx.Read()
if !ok {
return
}
statusBatch := make([]metrics.TxStatus, len(b.batch.Tx))
for i, tx := range b.batch.Tx {
statusBatch[i] = metrics.TxStatus{TxID: tx.Id, Status: b.status}
}
c.res.Metrics.OnReceiveBatch(statusBatch)
if c.res.isReceiveLimit() {
return
}
}
}