Skip to content

Commit d107976

Browse files
Merge branch 'feat/txpool-latency-qps' of https://github.com/noku-team/assertoor into feat/txpool-latency-qps
2 parents 242bdee + afbac70 commit d107976

File tree

2 files changed

+88
-85
lines changed
  • pkg/coordinator/tasks

2 files changed

+88
-85
lines changed

pkg/coordinator/tasks/tx_pool_latency_analysis/task.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ func (t *Task) Execute(ctx context.Context) error {
149149

150150
// generate and send tx
151151
go func(i int) {
152-
if ctx.Err() != nil && !isFailed {
153-
return
154-
}
155152

156153
tx, err := t.generateTransaction(ctx, i)
157154
if err != nil {
@@ -222,6 +219,7 @@ func (t *Task) Execute(ctx context.Context) error {
222219
if err != nil {
223220
t.logger.Errorf("Failed reading p2p events: %v", err)
224221
t.ctx.SetResult(types.TaskResultFailure)
222+
isFailed = true
225223
return
226224
}
227225

@@ -233,11 +231,13 @@ func (t *Task) Execute(ctx context.Context) error {
233231
if err != nil {
234232
t.logger.Errorf("Failed to parse transaction data: %v", err)
235233
t.ctx.SetResult(types.TaskResultFailure)
234+
isFailed = true
236235
return
237236
}
238237
if tx_index < 0 || tx_index >= totNumberOfTxes {
239238
t.logger.Errorf("Transaction index out of range: %d", tx_index)
240239
t.ctx.SetResult(types.TaskResultFailure)
240+
isFailed = true
241241
return
242242
}
243243
latenciesMus[tx_index] = time.Since(txStartTime[tx_index]).Microseconds()
@@ -307,6 +307,7 @@ func (t *Task) Execute(ctx context.Context) error {
307307
minLatency = lat
308308
}
309309
}
310+
t.logger.Infof("Max latency: %d mus, Min latency: %d mus", maxLatency, minLatency)
310311

311312
// Generate HDR plot
312313
plot, err := hdr.HdrPlot(latenciesMus)
@@ -317,12 +318,14 @@ func (t *Task) Execute(ctx context.Context) error {
317318
}
318319

319320
t.ctx.Outputs.SetVar("tx_count", totNumberOfTxes)
321+
t.ctx.Outputs.SetVar("min_latency_mus", minLatency)
320322
t.ctx.Outputs.SetVar("max_latency_mus", maxLatency)
321323

322324
t.ctx.SetResult(types.TaskResultSuccess)
323325

324326
outputs := map[string]interface{}{
325327
"tx_count": totNumberOfTxes,
328+
"min_latency_mus": minLatency,
326329
"max_latency_mus": maxLatency,
327330
"tx_pool_latency_hdr_plot": plot,
328331
}

pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go

Lines changed: 82 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/ethereum/go-ethereum/core/forkid"
1414
ethtypes "github.com/ethereum/go-ethereum/core/types"
1515
"github.com/ethereum/go-ethereum/crypto"
16-
"github.com/ethereum/go-ethereum/eth/protocols/eth"
1716
"github.com/ethereum/go-ethereum/params"
1817
"github.com/noku-team/assertoor/pkg/coordinator/clients/execution"
1918
"github.com/noku-team/assertoor/pkg/coordinator/helper"
@@ -125,6 +124,7 @@ func (t *Task) Execute(ctx context.Context) error {
125124
// Prepare to send transactions
126125
var totNumberOfTxes int = t.config.TPS * t.config.Duration_s
127126
var txs []*ethtypes.Transaction = make([]*ethtypes.Transaction, totNumberOfTxes)
127+
var testDeadline time.Time = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second)
128128

129129
startTime := time.Now()
130130
isFailed := false
@@ -145,9 +145,6 @@ func (t *Task) Execute(ctx context.Context) error {
145145

146146
// generate and send tx
147147
go func() {
148-
if ctx.Err() != nil && !isFailed {
149-
return
150-
}
151148

152149
tx, err := t.generateTransaction(ctx)
153150
if err != nil {
@@ -174,22 +171,8 @@ func (t *Task) Execute(ctx context.Context) error {
174171
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, elapsed.Seconds())
175172
}
176173

177-
select {
178-
case <-ctx.Done():
179-
t.logger.Warnf("Task cancelled, stopping transaction generation.")
180-
return
181-
default:
182-
if time.Since(startTime) >= time.Duration(t.config.Duration_s)*time.Second {
183-
t.logger.Infof("Reached duration limit, stopping transaction generation.")
184-
return
185-
}
186-
}
187174
}()
188175

189-
if isFailed {
190-
return
191-
}
192-
193176
// Sleep to control the TPS
194177
if i < totNumberOfTxes-1 {
195178
if sleepTime > 0 {
@@ -200,79 +183,83 @@ func (t *Task) Execute(ctx context.Context) error {
200183
}
201184
}
202185

203-
execTime := time.Since(startExecTime)
204-
t.logger.Infof("Time to generate %d transactions: %v", t.config.TPS, execTime)
205-
}()
206-
207-
lastMeasureTime := time.Now()
208-
gotTx := 0
209-
210-
if isFailed {
211-
return nil
212-
}
213-
214-
for gotTx < t.config.TPS {
215-
if isFailed {
216-
return nil
186+
select {
187+
case <-ctx.Done():
188+
{
189+
t.logger.Warnf("Task cancelled, stopping transaction generation.")
190+
return
191+
}
192+
default:
193+
{
194+
// if testDeadline reached, stop sending txes
195+
if isFailed {
196+
return
197+
}
198+
if time.Now().After(testDeadline) {
199+
t.logger.Infof("Reached duration limit, stopping transaction generation.")
200+
return
201+
}
202+
}
217203
}
218204

219-
// Add a timeout of 180 seconds for reading transaction messages
220-
readChan := make(chan struct {
221-
p2pTxs *eth.TransactionsPacket
222-
err error
223-
})
224-
225-
go func() {
226-
txs, err := conn.ReadTransactionMessages()
227-
readChan <- struct {
228-
p2pTxs *eth.TransactionsPacket
229-
err error
230-
}{txs, err}
231-
}()
205+
}()
232206

233-
select {
234-
case result := <-readChan:
235-
if result.err != nil {
236-
t.logger.Errorf("Failed to read transaction messages: %v", result.err)
207+
// Wait P2P event messages
208+
func() {
209+
var receivedEvents int = 0
210+
for {
211+
txes, err := conn.ReadTransactionMessages()
212+
if err != nil {
213+
t.logger.Errorf("Failed reading p2p events: %v", err)
237214
t.ctx.SetResult(types.TaskResultFailure)
238-
return nil
239-
}
240-
gotTx += len(*result.p2pTxs)
241-
case <-time.After(180 * time.Second):
242-
t.logger.Warnf("Timeout after 180 seconds while reading transaction messages. Re-sending transactions...")
243-
244-
// Calculate how many transactions we're still missing
245-
missingTxCount := t.config.TPS - gotTx
246-
if missingTxCount <= 0 {
247-
break
215+
return
248216
}
249217

250-
// Re-send transactions to the original client
251-
for i := 0; i < missingTxCount && i < len(txs); i++ {
252-
err = client.GetRPCClient().SendTransaction(ctx, txs[i])
218+
for _, tx := range *txes {
219+
tx_data := tx.Data()
220+
// read tx_data that is in the format "tx_index:<index>"
221+
var tx_index int
222+
_, err := fmt.Sscanf(string(tx_data), "tx_index:%d", &tx_index)
253223
if err != nil {
254-
t.logger.WithError(err).Errorf("Failed to re-send transaction message, error: %v", err)
224+
t.logger.Errorf("Failed to parse transaction data: %v", err)
225+
t.ctx.SetResult(types.TaskResultFailure)
226+
isFailed = true
227+
return
228+
}
229+
if tx_index < 0 || tx_index >= totNumberOfTxes {
230+
t.logger.Errorf("Transaction index out of range: %d", tx_index)
255231
t.ctx.SetResult(types.TaskResultFailure)
256-
return nil
232+
isFailed = true
233+
return
234+
}
235+
//latenciesMus[tx_index] = time.Since(txStartTime[tx_index]).Microseconds()
236+
receivedEvents++
237+
238+
if receivedEvents%t.config.MeasureInterval == 0 {
239+
t.logger.Infof("Received %d p2p events", sentTxCount)
257240
}
258241
}
259242

260-
t.logger.Infof("Re-sent %d transactions", missingTxCount)
261-
continue
262-
}
243+
if receivedEvents == totNumberOfTxes {
244+
t.logger.Infof("Reading of p2p events finished")
245+
return
246+
}
263247

264-
if gotTx%t.config.MeasureInterval != 0 {
265-
continue
248+
select {
249+
case <-ctx.Done():
250+
t.logger.Warnf("Task cancelled, stopping reading p2p events.")
251+
return
252+
default:
253+
// check test deadline
254+
if time.Now().After(testDeadline) {
255+
t.logger.Warnf("Reached duration limit, stopping reading p2p events.")
256+
return
257+
}
258+
}
266259
}
260+
}()
267261

268-
t.logger.Infof("Got %d transactions", gotTx)
269-
t.logger.Infof("Tx/s: (%d txs processed): %.2f / s \n", gotTx, float64(t.config.MeasureInterval)*float64(time.Second)/float64(time.Since(lastMeasureTime)))
270-
271-
lastMeasureTime = time.Now()
272-
}
273-
274-
totalTime := time.Since(startTime)
275-
t.logger.Infof("Total time for %d transactions: %.2fs", sentTxCount, totalTime.Seconds())
262+
lastMeasureTime := time.Since(startTime)
276263

277264
// send to other clients, for speeding up tx mining
278265
for _, tx := range txs {
@@ -285,16 +272,29 @@ func (t *Task) Execute(ctx context.Context) error {
285272
}
286273
}
287274

275+
// Check if the context was cancelled or other errors occurred
276+
if ctx.Err() != nil && !isFailed {
277+
return nil
278+
}
279+
280+
// Calculate statistics
281+
processed_tx_per_second := float64(sentTxCount) / lastMeasureTime.Seconds()
282+
283+
t.ctx.Outputs.SetVar("mean_throughput", processed_tx_per_second)
284+
t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s", sentTxCount, lastMeasureTime.Seconds(), processed_tx_per_second)
285+
t.ctx.Outputs.SetVar("tx_count", totNumberOfTxes)
286+
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, lastMeasureTime.Seconds())
287+
288+
t.ctx.SetResult(types.TaskResultSuccess)
289+
288290
outputs := map[string]interface{}{
289-
"total_time_mus": totalTime.Microseconds(),
290-
"tps": t.config.TPS,
291+
"tx_count": totNumberOfTxes,
292+
"mean_throughput": processed_tx_per_second,
291293
}
294+
292295
outputsJSON, _ := json.Marshal(outputs)
293296
t.logger.Infof("outputs_json: %s", string(outputsJSON))
294297

295-
t.ctx.Outputs.SetVar("total_time_mus", totalTime.Milliseconds())
296-
t.ctx.SetResult(types.TaskResultSuccess)
297-
298298
return nil
299299
}
300300

0 commit comments

Comments
 (0)