Skip to content

Commit a4db677

Browse files
feat(throughput): add vegeta support
1 parent c4e59f8 commit a4db677

File tree

1 file changed

+142
-51
lines changed
  • pkg/coordinator/tasks/tx_pool_throughput_analysis

1 file changed

+142
-51
lines changed

pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go

Lines changed: 142 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/noku-team/assertoor/pkg/coordinator/utils/sentry"
2222
"github.com/noku-team/assertoor/pkg/coordinator/wallet"
2323
"github.com/sirupsen/logrus"
24+
vegeta "github.com/tsenart/vegeta/v12/lib"
2425
)
2526

2627
var (
@@ -111,72 +112,63 @@ func (t *Task) Execute(ctx context.Context) error {
111112
defer conn.Close()
112113

113114
var txs []*ethtypes.Transaction
115+
var sentTxCount int
116+
var isFailed bool
114117

115118
startTime := time.Now()
116-
isFailed := false
117-
sentTxCount := 0
118119

119-
go func() {
120-
startExecTime := time.Now()
121-
endTime := startExecTime.Add(time.Second)
122-
123-
for i := range t.config.QPS {
124-
// Calculate how much time we have left
125-
remainingTime := time.Until(endTime)
126-
127-
// Calculate sleep time to distribute remaining transactions evenly
128-
sleepTime := remainingTime / time.Duration(t.config.QPS-i)
129-
130-
// generate and sign tx
131-
go func() {
132-
if ctx.Err() != nil && !isFailed {
133-
return
134-
}
135-
136-
tx, err := t.generateTransaction(ctx)
137-
if err != nil {
138-
t.logger.Errorf("Failed to create transaction: %v", err)
139-
t.ctx.SetResult(types.TaskResultFailure)
140-
isFailed = true
141-
return
142-
}
143-
144-
sentTxCount++
120+
// Create vegeta attacker
121+
attacker := NewTxAttacker(t, client, ctx)
145122

146-
if sentTxCount%t.config.MeasureInterval == 0 {
147-
elapsed := time.Since(startTime)
148-
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, elapsed.Seconds())
149-
}
123+
// Create a dummy targeter (required by vegeta interface but not used for transactions)
124+
targeter := func(tgt *vegeta.Target) error {
125+
*tgt = vegeta.Target{
126+
Method: "POST",
127+
URL: "dummy://transaction",
128+
}
129+
return nil
130+
}
150131

151-
err = client.GetRPCClient().SendTransaction(ctx, tx)
152-
if err != nil {
153-
t.logger.WithField("client", client.GetName()).Errorf("Failed to send transaction: %v", err)
154-
t.ctx.SetResult(types.TaskResultFailure)
155-
isFailed = true
156-
return
157-
}
132+
// Create pacer for desired QPS over 1 second
133+
rate := vegeta.Rate{Freq: t.config.QPS, Per: time.Second}
134+
pacer := vegeta.ConstantPacer{Freq: rate.Freq, Per: rate.Per}
158135

159-
txs = append(txs, tx)
160-
}()
136+
// Attack for 1 second duration
137+
duration := time.Second
138+
results := attacker.Attack(targeter, pacer, duration, "tx_attack")
161139

162-
if isFailed {
140+
// Process results
141+
done := make(chan bool)
142+
go func() {
143+
defer close(done)
144+
for result := range results {
145+
if result.Error != "" {
146+
t.logger.Errorf("Transaction error: %s", result.Error)
147+
t.ctx.SetResult(types.TaskResultFailure)
148+
isFailed = true
163149
return
164150
}
165151

166-
time.Sleep(sleepTime)
152+
sentTxCount++
153+
154+
// Note: We can't easily get the actual tx object from vegeta results
155+
// so we'll generate them again for the broadcast to other clients
167156
}
168157

169-
execTime := time.Since(startExecTime)
170-
t.logger.Infof("Time to generate %d transactions: %v", t.config.QPS, execTime)
158+
execTime := time.Since(startTime)
159+
t.logger.Infof("Time to send %d transactions: %v", sentTxCount, execTime)
171160
}()
172161

173-
lastMeasureTime := time.Now()
174-
gotTx := 0
162+
// Wait for transaction sending to complete
163+
<-done
175164

176165
if isFailed {
177166
return nil
178167
}
179168

169+
lastMeasureTime := time.Now()
170+
gotTx := 0
171+
180172
for gotTx < t.config.QPS {
181173
if isFailed {
182174
return nil
@@ -240,14 +232,24 @@ func (t *Task) Execute(ctx context.Context) error {
240232
totalTime := time.Since(startTime)
241233
t.logger.Infof("Total time for %d transactions: %.2fs", sentTxCount, totalTime.Seconds())
242234

235+
// Generate transactions for broadcasting to other clients
236+
for range sentTxCount {
237+
tx, err := t.generateTransaction(ctx)
238+
if err != nil {
239+
t.logger.Errorf("Failed to generate transaction for broadcasting: %v", err)
240+
continue
241+
}
242+
txs = append(txs, tx)
243+
}
244+
243245
// send to other clients, for speeding up tx mining
244246
for _, tx := range txs {
245247
for _, otherClient := range executionClients {
246248
if otherClient.GetName() == client.GetName() {
247249
continue
248250
}
249251

250-
client.GetRPCClient().SendTransaction(ctx, tx)
252+
otherClient.GetRPCClient().SendTransaction(ctx, tx)
251253
}
252254
}
253255

@@ -317,9 +319,7 @@ func (t *Task) generateTransaction(ctx context.Context) (*ethtypes.Transaction,
317319
feeCap := &helper.BigInt{Value: *big.NewInt(100000000000)} // 100 Gwei
318320
tipCap := &helper.BigInt{Value: *big.NewInt(1000000000)} // 1 Gwei
319321

320-
var txObj ethtypes.TxData
321-
322-
txObj = &ethtypes.DynamicFeeTx{
322+
var txObj = &ethtypes.DynamicFeeTx{
323323
ChainID: t.ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(),
324324
Nonce: nonce,
325325
GasTipCap: &tipCap.Value,
@@ -339,3 +339,94 @@ func (t *Task) generateTransaction(ctx context.Context) (*ethtypes.Transaction,
339339

340340
return tx, nil
341341
}
342+
343+
// TxAttacker implements a custom vegeta attacker for transaction sending
344+
type TxAttacker struct {
345+
task *Task
346+
client *execution.Client
347+
ctx context.Context
348+
}
349+
350+
// NewTxAttacker creates a new transaction attacker
351+
func NewTxAttacker(task *Task, client *execution.Client, ctx context.Context) *TxAttacker {
352+
return &TxAttacker{
353+
task: task,
354+
client: client,
355+
ctx: ctx,
356+
}
357+
}
358+
359+
// Attack implements the vegeta attacker interface for transaction sending
360+
func (a *TxAttacker) Attack(targeter vegeta.Targeter, pacer vegeta.Pacer, duration time.Duration, name string) <-chan *vegeta.Result {
361+
results := make(chan *vegeta.Result)
362+
363+
go func() {
364+
defer close(results)
365+
366+
var (
367+
began = time.Now()
368+
target vegeta.Target
369+
err error
370+
count int
371+
)
372+
373+
for {
374+
elapsed := time.Since(began)
375+
if elapsed >= duration {
376+
break
377+
}
378+
379+
// Get next target (not used for transactions but required by interface)
380+
if err = targeter(&target); err != nil {
381+
results <- &vegeta.Result{
382+
Error: err.Error(),
383+
}
384+
continue
385+
}
386+
387+
// Wait for the pacer
388+
if hit, ok := pacer.Pace(elapsed, uint64(count)); !ok {
389+
break
390+
} else if hit > 0 {
391+
time.Sleep(hit)
392+
}
393+
394+
// Generate and send transaction
395+
before := time.Now()
396+
tx, err := a.task.generateTransaction(a.ctx)
397+
if err != nil {
398+
results <- &vegeta.Result{
399+
Error: fmt.Sprintf("Failed to generate transaction: %v", err),
400+
Latency: time.Since(before),
401+
}
402+
count++
403+
continue
404+
}
405+
406+
err = a.client.GetRPCClient().SendTransaction(a.ctx, tx)
407+
latency := time.Since(before)
408+
409+
result := &vegeta.Result{
410+
Latency: latency,
411+
}
412+
413+
if err != nil {
414+
result.Error = fmt.Sprintf("Failed to send transaction: %v", err)
415+
result.Code = 500 // Use HTTP-like error codes
416+
} else {
417+
result.Code = 200 // Success
418+
result.BytesOut = uint64(tx.Size())
419+
}
420+
421+
results <- result
422+
count++
423+
424+
// Log progress
425+
if count%a.task.config.MeasureInterval == 0 {
426+
a.task.logger.Infof("Sent %d transactions in %.2fs", count, elapsed.Seconds())
427+
}
428+
}
429+
}()
430+
431+
return results
432+
}

0 commit comments

Comments
 (0)