Skip to content

Commit 821938b

Browse files
committed
add logs; more uniform code
1 parent fa51f57 commit 821938b

File tree

2 files changed

+69
-37
lines changed
  • pkg/coordinator/tasks

2 files changed

+69
-37
lines changed

pkg/coordinator/tasks/tx_pool_latency_analysis/task.go

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var (
2727
TaskName = "tx_pool_latency_analysis"
2828
TaskDescriptor = &types.TaskDescriptor{
2929
Name: TaskName,
30-
Description: "Checks the latency of transactions in the Ethereum TxPool",
30+
Description: "Checks the TxPool transaction propagation latency",
3131
Config: DefaultConfig(),
3232
NewTask: NewTask,
3333
}
@@ -101,6 +101,10 @@ func (t *Task) Execute(ctx context.Context) error {
101101

102102
client := executionClients[rand.Intn(len(executionClients))]
103103

104+
t.logger.Infof("Measuring TxPool transaction propagation *latency*")
105+
t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds",
106+
client.GetName(), t.config.TPS, t.config.Duration_s)
107+
104108
conn, err := t.getTcpConn(ctx, client)
105109
if err != nil {
106110
t.logger.Errorf("Failed to get wire eth TCP connection: %v", err)
@@ -169,14 +173,14 @@ func (t *Task) Execute(ctx context.Context) error {
169173
}
170174

171175
txs[i] = tx
172-
//hashToIndex[tx.Hash().String()] = i
173176
sentTxCount++
174177

175178
// log transaction sending
176179
if sentTxCount%t.config.MeasureInterval == 0 {
177180
elapsed := time.Since(startTime)
178181
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, elapsed.Seconds())
179182
}
183+
180184
}(i)
181185

182186
// Sleep to control the TPS
@@ -190,20 +194,16 @@ func (t *Task) Execute(ctx context.Context) error {
190194

191195
select {
192196
case <-ctx.Done():
193-
{
194-
t.logger.Warnf("Task cancelled, stopping transaction generation.")
197+
t.logger.Warnf("Task cancelled, stopping transaction generation.")
198+
return
199+
default:
200+
// if testDeadline reached, stop sending txes
201+
if isFailed {
195202
return
196203
}
197-
default:
198-
{
199-
// if testDeadline reached, stop sending txes
200-
if isFailed {
201-
return
202-
}
203-
if time.Now().After(testDeadline) {
204-
t.logger.Infof("Reached duration limit, stopping transaction generation.")
205-
return
206-
}
204+
if time.Now().After(testDeadline) {
205+
t.logger.Infof("Reached duration limit, stopping transaction generation.")
206+
return
207207
}
208208
}
209209
}
@@ -272,12 +272,15 @@ func (t *Task) Execute(ctx context.Context) error {
272272
}
273273
}()
274274

275+
lastMeasureDelay := time.Since(startTime)
276+
t.logger.Infof("Last measure delay since start time: %s", lastMeasureDelay)
277+
275278
if coordinatedOmissionEventCount > 0 {
276-
t.logger.Warnf("Coordinated omission events count: %d", coordinatedOmissionEventCount)
279+
t.logger.Warnf("Coordinated omission events: %d", coordinatedOmissionEventCount)
277280
}
278281

279282
if duplicatedP2PEventCount > 0 {
280-
t.logger.Warnf("Duplicated p2p event count: %d", duplicatedP2PEventCount)
283+
t.logger.Warnf("Duplicated p2p events: %d", duplicatedP2PEventCount)
281284
}
282285

283286
// Send txes to other clients, for speeding up tx mining
@@ -306,7 +309,7 @@ func (t *Task) Execute(ctx context.Context) error {
306309
}
307310
}
308311
if notReceivedP2PEventCount > 0 {
309-
t.logger.Warnf("Missed %d p2p events, assigned test dureation as latency", notReceivedP2PEventCount)
312+
t.logger.Warnf("Missed p2p events: %d (assigned latency=duration)", notReceivedP2PEventCount)
310313
}
311314

312315
// Calculate statistics

pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
TaskName = "tx_pool_throughput_analysis"
2727
TaskDescriptor = &types.TaskDescriptor{
2828
Name: TaskName,
29-
Description: "Checks the throughput of transactions in the Ethereum TxPool",
29+
Description: "Checks the TxPool transaction propagation throughput",
3030
Config: DefaultConfig(),
3131
NewTask: NewTask,
3232
}
@@ -100,6 +100,10 @@ func (t *Task) Execute(ctx context.Context) error {
100100

101101
client := executionClients[rand.Intn(len(executionClients))]
102102

103+
t.logger.Infof("Measuring TxPool transaction propagation *throughput*")
104+
t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds",
105+
client.GetName(), t.config.TPS, t.config.Duration_s)
106+
103107
conn, err := t.getTcpConn(ctx, client)
104108
if err != nil {
105109
t.logger.Errorf("Failed to get wire eth TCP connection: %v", err)
@@ -121,14 +125,17 @@ func (t *Task) Execute(ctx context.Context) error {
121125
}
122126
}
123127

124-
// Prepare to send transactions
128+
// Prepare to collect transaction latencies
125129
var totNumberOfTxes int = t.config.TPS * t.config.Duration_s
126130
var txs []*ethtypes.Transaction = make([]*ethtypes.Transaction, totNumberOfTxes)
131+
var txStartTime []time.Time = make([]time.Time, totNumberOfTxes)
127132
var testDeadline time.Time = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second)
133+
var latenciesMus = make([]int64, totNumberOfTxes)
128134

129135
startTime := time.Now()
130136
isFailed := false
131137
sentTxCount := 0
138+
duplicatedP2PEventCount := 0
132139
coordinatedOmissionEventCount := 0
133140

134141
// Start generating and sending transactions
@@ -155,6 +162,7 @@ func (t *Task) Execute(ctx context.Context) error {
155162
return
156163
}
157164

165+
txStartTime[i] = time.Now()
158166
err = client.GetRPCClient().SendTransaction(ctx, tx)
159167
if err != nil {
160168
t.logger.WithField("client", client.GetName()).Errorf("Failed to send transaction: %v", err)
@@ -185,20 +193,16 @@ func (t *Task) Execute(ctx context.Context) error {
185193

186194
select {
187195
case <-ctx.Done():
188-
{
189-
t.logger.Warnf("Task cancelled, stopping transaction generation.")
196+
t.logger.Warnf("Task cancelled, stopping transaction generation.")
197+
return
198+
default:
199+
// if testDeadline reached, stop sending txes
200+
if isFailed {
190201
return
191202
}
192-
default:
193-
{
194-
// if testDeadline reached, stop sending txes
195-
if isFailed {
196-
return
197-
}
198-
if time.Now().After(testDeadline) {
199-
t.logger.Infof("Reached duration limit, stopping transaction generation.")
200-
return
201-
}
203+
if time.Now().After(testDeadline) {
204+
t.logger.Infof("Reached duration limit, stopping transaction generation.")
205+
return
202206
}
203207
}
204208
}
@@ -233,7 +237,14 @@ func (t *Task) Execute(ctx context.Context) error {
233237
isFailed = true
234238
return
235239
}
236-
//latenciesMus[tx_index] = time.Since(txStartTime[tx_index]).Microseconds()
240+
241+
// log the duplicated p2p events, and count duplicated p2p events
242+
// todo: add a timeout of N seconds that activates if duplicatedP2PEventCount + receivedEvents >= totNumberOfTxes, if exceeded, exit the function
243+
if latenciesMus[tx_index] != 0 {
244+
duplicatedP2PEventCount++
245+
}
246+
247+
latenciesMus[tx_index] = time.Since(txStartTime[tx_index]).Microseconds()
237248
receivedEvents++
238249

239250
if receivedEvents%t.config.MeasureInterval == 0 {
@@ -260,10 +271,15 @@ func (t *Task) Execute(ctx context.Context) error {
260271
}
261272
}()
262273

263-
lastMeasureTime := time.Since(startTime)
274+
lastMeasureDelay := time.Since(startTime)
275+
t.logger.Infof("Last measure delay since start time: %s", lastMeasureDelay)
264276

265277
if coordinatedOmissionEventCount > 0 {
266-
t.logger.Warnf("Coordinated omission events count: %d", coordinatedOmissionEventCount)
278+
t.logger.Warnf("Coordinated omission events: %d", coordinatedOmissionEventCount)
279+
}
280+
281+
if duplicatedP2PEventCount > 0 {
282+
t.logger.Warnf("Duplicated p2p events: %d", duplicatedP2PEventCount)
267283
}
268284

269285
// Send txes to other clients, for speeding up tx mining
@@ -282,13 +298,26 @@ func (t *Task) Execute(ctx context.Context) error {
282298
return nil
283299
}
284300

301+
// Check if we received all transactions p2p events
302+
notReceivedP2PEventCount := 0
303+
for i := 0; i < totNumberOfTxes; i++ {
304+
if latenciesMus[i] == 0 {
305+
notReceivedP2PEventCount++
306+
// Assign a default value for missing P2P events
307+
latenciesMus[i] = (time.Duration(t.config.Duration_s) * time.Second).Microseconds()
308+
}
309+
}
310+
if notReceivedP2PEventCount > 0 {
311+
t.logger.Warnf("Missed p2p events: %d (assigned latency=duration)", notReceivedP2PEventCount)
312+
}
313+
285314
// Calculate statistics
286-
processed_tx_per_second := float64(sentTxCount) / lastMeasureTime.Seconds()
315+
processed_tx_per_second := float64(sentTxCount) / lastMeasureDelay.Seconds()
287316

288317
t.ctx.Outputs.SetVar("mean_tps_throughput", processed_tx_per_second)
289-
t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s", sentTxCount, lastMeasureTime.Seconds(), processed_tx_per_second)
318+
t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s", sentTxCount, lastMeasureDelay.Seconds(), processed_tx_per_second)
290319
t.ctx.Outputs.SetVar("tx_count", totNumberOfTxes)
291-
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, lastMeasureTime.Seconds())
320+
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, lastMeasureDelay.Seconds())
292321

293322
t.ctx.SetResult(types.TaskResultSuccess)
294323

0 commit comments

Comments
 (0)