Skip to content

Commit 7e3a25d

Browse files
committed
initial refactoring (to complete)
1 parent 105b3bd commit 7e3a25d

File tree

2 files changed

+36
-17
lines changed

2 files changed

+36
-17
lines changed

pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p
2020

2121
### Outputs
2222

23-
- **`total_time_mus`**:
24-
The total time taken to send the transactions in microseconds.
23+
- **`tx_count`**:
24+
The total number of transactions sent.
25+
26+
- **`max_latency_mus`**:
27+
The max latency of the transactions in microseconds.
2528

2629
### Defaults
2730

pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,21 @@ func (t *Task) Execute(ctx context.Context) error {
110110

111111
defer conn.Close()
112112

113-
var txs []*ethtypes.Transaction
113+
// Wait for the specified seconds before starting the task
114+
if t.config.SecondsBeforeRunning > 0 {
115+
t.logger.Infof("Waiting for %d seconds before starting the task...", t.config.SecondsBeforeRunning)
116+
select {
117+
case <-time.After(time.Duration(t.config.SecondsBeforeRunning) * time.Second):
118+
t.logger.Infof("Starting task after waiting.")
119+
case <-ctx.Done():
120+
t.logger.Warnf("Task cancelled before starting.")
121+
return ctx.Err()
122+
}
123+
}
124+
125+
// Prepare to send transactions
126+
var totNumberOfTxes int = t.config.QPS * t.config.Duration_s
127+
var tx_events []*ethtypes.Transaction = make([]*ethtypes.Transaction, totNumberOfTxes)
114128

115129
startTime := time.Now()
116130
isFailed := false
@@ -120,14 +134,15 @@ func (t *Task) Execute(ctx context.Context) error {
120134
startExecTime := time.Now()
121135
endTime := startExecTime.Add(time.Second)
122136

123-
for i := range t.config.QPS {
137+
// Generate and send transactions
138+
for i := 0; i < totNumberOfTxes; i++ {
124139
// Calculate how much time we have left
125140
remainingTime := time.Until(endTime)
126141

127142
// Calculate sleep time to distribute remaining transactions evenly
128143
sleepTime := remainingTime / time.Duration(t.config.QPS-i)
129144

130-
// generate and sign tx
145+
// generate and send tx
131146
go func() {
132147
if ctx.Err() != nil && !isFailed {
133148
return
@@ -141,13 +156,6 @@ func (t *Task) Execute(ctx context.Context) error {
141156
return
142157
}
143158

144-
sentTxCount++
145-
146-
if sentTxCount%t.config.MeasureInterval == 0 {
147-
elapsed := time.Since(startTime)
148-
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, elapsed.Seconds())
149-
}
150-
151159
err = client.GetRPCClient().SendTransaction(ctx, tx)
152160
if err != nil {
153161
t.logger.WithField("client", client.GetName()).Errorf("Failed to send transaction: %v", err)
@@ -156,7 +164,15 @@ func (t *Task) Execute(ctx context.Context) error {
156164
return
157165
}
158166

159-
txs = append(txs, tx)
167+
sentTxCount++
168+
169+
// log transaction sending
170+
if sentTxCount%t.config.MeasureInterval == 0 {
171+
elapsed := time.Since(startTime)
172+
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, elapsed.Seconds())
173+
}
174+
175+
tx_events = append(tx_events, tx)
160176
}()
161177

162178
if isFailed {
@@ -214,8 +230,8 @@ func (t *Task) Execute(ctx context.Context) error {
214230
}
215231

216232
// Re-send transactions to the original client
217-
for i := 0; i < missingTxCount && i < len(txs); i++ {
218-
err = client.GetRPCClient().SendTransaction(ctx, txs[i])
233+
for i := 0; i < missingTxCount && i < len(tx_events); i++ {
234+
err = client.GetRPCClient().SendTransaction(ctx, tx_events[i])
219235
if err != nil {
220236
t.logger.WithError(err).Errorf("Failed to re-send transaction message, error: %v", err)
221237
t.ctx.SetResult(types.TaskResultFailure)
@@ -232,7 +248,7 @@ func (t *Task) Execute(ctx context.Context) error {
232248
}
233249

234250
t.logger.Infof("Got %d transactions", gotTx)
235-
t.logger.Infof("Tx/s: (%d txs processed): %.2f / s \n", gotTx, float64(t.config.MeasureInterval)*float64(time.Second)/float64(time.Since(lastMeasureTime)))
251+
t.logger.Infof("Tx/s: (%d tx_events processed): %.2f / s \n", gotTx, float64(t.config.MeasureInterval)*float64(time.Second)/float64(time.Since(lastMeasureTime)))
236252

237253
lastMeasureTime = time.Now()
238254
}
@@ -241,7 +257,7 @@ func (t *Task) Execute(ctx context.Context) error {
241257
t.logger.Infof("Total time for %d transactions: %.2fs", sentTxCount, totalTime.Seconds())
242258

243259
// send to other clients, for speeding up tx mining
244-
for _, tx := range txs {
260+
for _, tx := range tx_events {
245261
for _, otherClient := range executionClients {
246262
if otherClient.GetName() == client.GetName() {
247263
continue

0 commit comments

Comments
 (0)