Skip to content

Commit 0143d44

Browse files
authored
Initial commit for producer/consumer loadtest client (#1190)
* Initial commit for producer/consumer loadtest client * update sei-cosmos * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * rm rounds * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * debug * finalize * debug * debug * debug * debug * debug * debug * debug * debug * bump cosmos * gofmt * debug * debug
1 parent e696486 commit 0143d44

File tree

8 files changed

+221
-323
lines changed

8 files changed

+221
-323
lines changed

go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ require (
205205
github.com/nishanths/predeclared v0.2.2 // indirect
206206
github.com/oasisprotocol/curve25519-voi v0.0.0-20210609091139-0a56a4bca00b // indirect
207207
github.com/olekukonko/tablewriter v0.0.5 // indirect
208+
github.com/onsi/ginkgo/v2 v2.1.4 // indirect
208209
github.com/opencontainers/runc v1.1.5 // indirect
209210
github.com/otiai10/copy v1.6.0 // indirect
210211
github.com/pelletier/go-toml v1.9.5 // indirect
@@ -306,7 +307,7 @@ require (
306307
replace (
307308
github.com/CosmWasm/wasmd => github.com/sei-protocol/sei-wasmd v0.0.4
308309
github.com/confio/ics23/go => github.com/cosmos/cosmos-sdk/ics23/go v0.8.0
309-
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.2.69
310+
github.com/cosmos/cosmos-sdk => github.com/sei-protocol/sei-cosmos v0.2.71
310311
github.com/cosmos/iavl => github.com/sei-protocol/sei-iavl v0.1.8
311312
github.com/cosmos/ibc-go/v3 => github.com/sei-protocol/sei-ibc-go/v3 v3.3.0
312313
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

go.sum

+4-3
Original file line numberDiff line numberDiff line change
@@ -986,8 +986,9 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k
986986
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
987987
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
988988
github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
989-
github.com/onsi/ginkgo/v2 v2.1.3 h1:e/3Cwtogj0HA+25nMP1jCMDIf8RtRYbGwGGuBIFztkc=
990989
github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
990+
github.com/onsi/ginkgo/v2 v2.1.4 h1:GNapqRSid3zijZ9H77KrgVG4/8KqiyRsxcSxe+7ApXY=
991+
github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
991992
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
992993
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
993994
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
@@ -1169,8 +1170,8 @@ github.com/securego/gosec/v2 v2.11.0/go.mod h1:SX8bptShuG8reGC0XS09+a4H2BoWSJi+f
11691170
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
11701171
github.com/sei-protocol/goutils v0.0.2 h1:Bfa7Sv+4CVLNM20QcpvGb81B8C5HkQC/kW1CQpIbXDA=
11711172
github.com/sei-protocol/goutils v0.0.2/go.mod h1:iYE2DuJfEnM+APPehr2gOUXfuLuPsVxorcDO+Tzq9q8=
1172-
github.com/sei-protocol/sei-cosmos v0.2.69 h1:tN18VEles46CzClEQmLiBO3tYbhYIFWJvFuSJtWRdV4=
1173-
github.com/sei-protocol/sei-cosmos v0.2.69/go.mod h1:nd65tkKknX2nrtkSldt17rGU5uaNZ/ly/fB3FqNoYck=
1173+
github.com/sei-protocol/sei-cosmos v0.2.71 h1:L1aNyXd/xQA52AuDilf1RDc5RrkShu4cYqvrA2N393E=
1174+
github.com/sei-protocol/sei-cosmos v0.2.71/go.mod h1:CykNPmj90YkwBorkvnc05u9k9MBNDHC3h4CIdmq3R98=
11741175
github.com/sei-protocol/sei-db v0.0.24 h1:rSidZZ4GNEJRY+0gm5+RioNqmYiOiPaZuDQ+vIAiqi0=
11751176
github.com/sei-protocol/sei-db v0.0.24/go.mod h1:F/ZKZA8HJPcUzSZPA8yt6pfwlGriJ4RDR4eHKSGLStI=
11761177
github.com/sei-protocol/sei-iavl v0.1.8 h1:HcK7Nv64PtJXUSdPV2+8AwRNRrcFQwsAmSZX6Em625E=

loadtest/config.json

+4-7
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
{
22
"grpc_endpoints": "127.0.0.1:9090",
3-
"blockchain_endpoint": "http://localhost:26657/blockchain",
3+
"blockchain_endpoint": "http://localhost:26657",
44
"node_uri": "tcp://localhost:26657",
55
"tls": false,
6-
"msgs_per_tx": 10,
6+
"msgs_per_tx": 1,
77
"chain_id": "sei-loadtest-testnet",
8-
"txs_per_block": 400,
9-
"rounds": 5,
8+
"target_tps": 500,
109
"price_distribution": {
1110
"min": "45",
1211
"max": "55",
@@ -20,7 +19,7 @@
2019
"message_configs": {
2120
"default": {
2221
"gas": 3000000,
23-
"fee": 50000
22+
"fee": 200000
2423
},
2524
"collect_rewards": {
2625
"gas": 10000000,
@@ -42,8 +41,6 @@
4241
"begin_redelegate_percentage": "0.25"
4342
}
4443
},
45-
"constant": true,
46-
"loadtest_interval": 600,
4744
"message_type": "bank,dex,staking,failure_dex_malformed,failure_dex_invalid,collect_rewards,distribute_rewards,wasm_instantiate",
4845
"run_oracle": false,
4946
"metrics_port": 9695,

loadtest/loadtest_client.go

+40-196
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,21 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"math/rand"
7-
"os"
8-
"path/filepath"
98
"strings"
109
"sync"
10+
"sync/atomic"
1111
"time"
1212

13-
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
13+
"golang.org/x/time/rate"
14+
1415
"github.com/cosmos/cosmos-sdk/types"
15-
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
1616
typestx "github.com/cosmos/cosmos-sdk/types/tx"
1717
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
1818

1919
"crypto/tls"
2020

21-
"github.com/k0kubun/pp/v3"
2221
"google.golang.org/grpc"
2322
"google.golang.org/grpc/credentials"
2423
)
@@ -27,12 +26,8 @@ type LoadTestClient struct {
2726
LoadTestConfig Config
2827
TestConfig EncodingConfig
2928
TxClients []typestx.ServiceClient
30-
TxHashFile *os.File
3129
SignerClient *SignerClient
3230
ChainID string
33-
TxHashList []string
34-
TxResponseChan chan *string
35-
TxHashListMutex *sync.Mutex
3631
GrpcConns []*grpc.ClientConn
3732
StakingQueryClient stakingtypes.QueryClient
3833
// Staking specific variables
@@ -67,22 +62,12 @@ func NewLoadTestClient(config Config) *LoadTestClient {
6762
GrpcConns[i] = grpcConn
6863
}
6964

70-
// setup output files
71-
userHomeDir, _ := os.UserHomeDir()
72-
_ = os.Mkdir(filepath.Join(userHomeDir, "outputs"), os.ModePerm)
73-
filename := filepath.Join(userHomeDir, "outputs", "test_tx_hash")
74-
_ = os.Remove(filename)
75-
outputFile, _ := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
7665
return &LoadTestClient{
7766
LoadTestConfig: config,
7867
TestConfig: TestConfig,
7968
TxClients: TxClients,
80-
TxHashFile: outputFile,
8169
SignerClient: NewSignerClient(config.NodeURI),
8270
ChainID: config.ChainID,
83-
TxHashList: []string{},
84-
TxResponseChan: make(chan *string),
85-
TxHashListMutex: &sync.Mutex{},
8671
GrpcConns: GrpcConns,
8772
StakingQueryClient: stakingtypes.NewQueryClient(GrpcConns[0]),
8873
DelegationMap: map[string]map[string]int{},
@@ -108,194 +93,53 @@ func (c *LoadTestClient) Close() {
10893
}
10994
}
11095

111-
func (c *LoadTestClient) AppendTxHash(txHash string) {
112-
c.TxResponseChan <- &txHash
113-
}
114-
115-
func (c *LoadTestClient) WriteTxHashToFile() {
116-
fmt.Printf("Writing Tx Hashes to: %s \n", c.TxHashFile.Name())
117-
file := c.TxHashFile
118-
for _, txHash := range c.TxHashList {
119-
txHashLine := fmt.Sprintf("%s\n", txHash)
120-
if _, err := file.WriteString(txHashLine); err != nil {
121-
panic(err)
122-
}
123-
}
124-
}
125-
126-
func (c *LoadTestClient) BuildTxs() (workgroups []*sync.WaitGroup, sendersList [][]func()) {
96+
func (c *LoadTestClient) BuildTxs(txQueue chan<- []byte, producerId int, wg *sync.WaitGroup, done <-chan struct{}, producedCount *int64) {
97+
defer wg.Done()
12798
config := c.LoadTestConfig
128-
numberOfAccounts := config.TxsPerBlock / config.MsgsPerTx * 2 // * 2 because we need two sets of accounts
129-
activeAccounts := []int{}
130-
inactiveAccounts := []int{}
131-
132-
for i := 0; i < int(numberOfAccounts); i++ {
133-
if i%2 == 0 {
134-
activeAccounts = append(activeAccounts, i)
135-
} else {
136-
inactiveAccounts = append(inactiveAccounts, i)
137-
}
138-
}
139-
140-
valKeys := c.SignerClient.GetValKeys()
141-
142-
for i := 0; i < int(config.Rounds); i++ {
143-
fmt.Printf("Preparing %d-th round\n", i)
144-
145-
wg := &sync.WaitGroup{}
146-
var senders []func()
147-
workgroups = append(workgroups, wg)
148-
c.generatedAdminMessageForBlock = false
149-
for j, account := range activeAccounts {
150-
accountIdentifier := fmt.Sprint(account)
151-
accountKeyPath := c.SignerClient.GetTestAccountKeyPath(uint64(account))
152-
key := c.SignerClient.GetKey(accountIdentifier, "test", accountKeyPath)
153-
154-
msgs, failureExpected, signer, gas, fee := c.generateMessage(config, key, config.MsgsPerTx)
99+
accountIdentifier := fmt.Sprint(producerId)
100+
accountKeyPath := c.SignerClient.GetTestAccountKeyPath(uint64(producerId))
101+
key := c.SignerClient.GetKey(accountIdentifier, "test", accountKeyPath)
102+
103+
for {
104+
select {
105+
case <-done:
106+
fmt.Printf("Stopping producer %d\n", producerId)
107+
return
108+
default:
109+
msgs, _, _, gas, fee := c.generateMessage(config, key, config.MsgsPerTx)
155110
txBuilder := TestConfig.TxConfig.NewTxBuilder()
156111
_ = txBuilder.SetMsgs(msgs...)
157-
seqDelta := uint64(i / 2)
158-
mode := typestx.BroadcastMode_BROADCAST_MODE_SYNC
159-
if j == len(activeAccounts)-1 {
160-
mode = typestx.BroadcastMode_BROADCAST_MODE_BLOCK
161-
}
162-
// Note: There is a potential race condition here with seqnos
163-
// in which a later seqno is delievered before an earlier seqno
164-
// In practice, we haven't run into this issue so we'll leave this
165-
// as is.
166-
sender := SendTx(signer, &txBuilder, mode, seqDelta, failureExpected, *c, gas, fee)
167-
wg.Add(1)
168-
senders = append(senders, func() {
169-
defer wg.Done()
170-
sender()
171-
})
172-
}
173-
174-
senders = append(senders, c.GenerateOracleSenders(i, config, valKeys, wg)...)
175-
176-
sendersList = append(sendersList, senders)
177-
inactiveAccounts, activeAccounts = activeAccounts, inactiveAccounts
178-
}
179-
180-
return workgroups, sendersList
181-
}
182-
183-
func (c *LoadTestClient) GenerateOracleSenders(i int, config Config, valKeys []cryptotypes.PrivKey, waitGroup *sync.WaitGroup) []func() {
184-
senders := []func(){}
185-
if config.RunOracle && i%2 == 0 {
186-
for _, valKey := range valKeys {
187-
// generate oracle tx
188-
msg := generateOracleMessage(valKey)
189-
txBuilder := TestConfig.TxConfig.NewTxBuilder()
190-
_ = txBuilder.SetMsgs(msg)
191-
seqDelta := uint64(i / 2)
192-
mode := typestx.BroadcastMode_BROADCAST_MODE_SYNC
193-
sender := SendTx(valKey, &txBuilder, mode, seqDelta, false, *c, 30000, 100000)
194-
waitGroup.Add(1)
195-
senders = append(senders, func() {
196-
defer waitGroup.Done()
197-
sender()
112+
txBuilder.SetGasLimit(gas)
113+
txBuilder.SetFeeAmount([]types.Coin{
114+
types.NewCoin("usei", types.NewInt(fee)),
198115
})
199-
}
200-
}
201-
return senders
202-
}
203-
204-
func (c *LoadTestClient) SendTxs(workgroups []*sync.WaitGroup, sendersList [][]func()) {
205-
defer close(c.TxResponseChan)
116+
// Use random seqno to get around txs that might already be seen in mempool
206117

207-
lastHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint)
208-
for i := 0; i < int(c.LoadTestConfig.Rounds); i++ {
209-
newHeight := getLastHeight(c.LoadTestConfig.BlockchainEndpoint)
210-
for newHeight == lastHeight {
211-
time.Sleep(10 * time.Millisecond)
212-
newHeight = getLastHeight(c.LoadTestConfig.BlockchainEndpoint)
213-
}
214-
fmt.Printf("Sending %d-th block\n", i)
215-
senders := sendersList[i]
216-
wg := workgroups[i]
217-
for _, sender := range senders {
218-
go sender()
118+
c.SignerClient.SignTx(c.ChainID, &txBuilder, key, uint64(rand.Intn(math.MaxInt)))
119+
txBytes, _ := TestConfig.TxConfig.TxEncoder()(txBuilder.GetTx())
120+
txQueue <- txBytes
121+
atomic.AddInt64(producedCount, 1)
219122
}
220-
wg.Wait()
221-
lastHeight = newHeight
222123
}
223124
}
224125

225-
func (c *LoadTestClient) GatherTxHashes() {
226-
for txHash := range c.TxResponseChan {
227-
c.TxHashList = append(c.TxHashList, *txHash)
228-
}
229-
fmt.Printf("Transactions Sent=%d\n", len(c.TxHashList))
230-
}
231-
232-
func (c *LoadTestClient) ValidateTxs() {
233-
defer c.Close()
234-
numTxs := len(c.TxHashList)
235-
resultChan := make(chan *types.TxResponse, numTxs)
236-
var waitGroup sync.WaitGroup
237-
238-
if numTxs == 0 {
239-
return
240-
}
241-
242-
for _, txHash := range c.TxHashList {
243-
waitGroup.Add(1)
244-
go func(txHash string) {
245-
defer waitGroup.Done()
246-
resultChan <- c.GetTxResponse(txHash)
247-
}(txHash)
248-
}
249-
250-
go func() {
251-
waitGroup.Wait()
252-
close(resultChan)
253-
}()
254-
255-
fmt.Printf("Validating %d Transactions... \n", len(c.TxHashList))
256-
waitGroup.Wait()
257-
258-
notCommittedTxs := 0
259-
responseCodeMap := map[int]int{}
260-
responseStringMap := map[string]int{}
261-
for result := range resultChan {
262-
// If the result is nil then that means the transaction was not committed
263-
if result == nil {
264-
notCommittedTxs++
265-
continue
266-
}
267-
code := result.Code
268-
codeString := "ok"
269-
if code != 0 {
270-
codespace := result.Codespace
271-
err := sdkerrors.ABCIError(codespace, code, fmt.Sprintf("Error code=%d ", code))
272-
codeString = err.Error()
126+
func (c *LoadTestClient) SendTxs(txQueue <-chan []byte, done <-chan struct{}, sentCount *int64, rateLimit int) {
127+
rateLimiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit)
128+
for {
129+
130+
select {
131+
case <-done:
132+
fmt.Printf("Stopping consumers\n")
133+
return
134+
case tx, ok := <-txQueue:
135+
if !ok {
136+
fmt.Printf("Stopping consumers\n")
137+
}
138+
if rateLimiter.Allow() {
139+
go SendTx(tx, typestx.BroadcastMode_BROADCAST_MODE_BLOCK, false, *c, sentCount)
140+
}
273141
}
274-
responseStringMap[codeString]++
275-
responseCodeMap[int(code)]++
276-
}
277-
278-
fmt.Printf("Transactions not committed: %d\n", notCommittedTxs)
279-
pp.Printf("Response Code Mapping: \n %s \n", responseStringMap)
280-
IncrTxNotCommitted(notCommittedTxs)
281-
for reason, count := range responseStringMap {
282-
IncrTxProcessCode(reason, count)
283-
}
284-
}
285-
286-
func (c *LoadTestClient) GetTxResponse(hash string) *types.TxResponse {
287-
grpcRes, err := c.GetTxClient().GetTx(
288-
context.Background(),
289-
&typestx.GetTxRequest{
290-
Hash: hash,
291-
},
292-
)
293-
fmt.Printf("Validated: %s\n", hash)
294-
if err != nil {
295-
fmt.Println(err)
296-
return nil
297142
}
298-
return grpcRes.TxResponse
299143
}
300144

301145
func (c *LoadTestClient) GetTxClient() typestx.ServiceClient {

0 commit comments

Comments
 (0)