Skip to content

Commit c7fd1cf

Browse files
Revert "Enhance transaction latency analysis by implementing synchronized arrays for precise latency measurement and improving transaction sending logic. Added QPS-based transaction distribution and refined logging for better monitoring of transaction flow."
This reverts commit 31355d6.
1 parent 578e7c7 commit c7fd1cf

File tree

1 file changed

+42
-110
lines changed
  • pkg/coordinator/tasks/tx_pool_latency_analysis

1 file changed

+42
-110
lines changed

pkg/coordinator/tasks/tx_pool_latency_analysis/task.go

Lines changed: 42 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -111,130 +111,64 @@ func (t *Task) Execute(ctx context.Context) error {
111111

112112
defer conn.Close()
113113

114-
// Create three synchronized arrays for precise latency calculation
115-
sendTimestamps := make([]time.Time, t.config.QPS)
116-
receiveTimestamps := make([]time.Time, t.config.QPS)
114+
var totalLatency time.Duration
115+
var latencies []time.Duration
117116

118117
var txs []*ethtypes.Transaction
119-
hashToIndex := make(map[string]int) // Map hash to array index
120-
121-
startTime := time.Now()
122-
isFailed := false
123-
sentTxCount := 0
124-
125-
// Send transactions at QPS rate
126-
go func() {
127-
startExecTime := time.Now()
128-
endTime := startExecTime.Add(time.Second)
129118

130-
for i := range t.config.QPS {
131-
if ctx.Err() != nil || isFailed {
132-
return
133-
}
134-
135-
// Calculate how much time we have left
136-
remainingTime := time.Until(endTime)
137-
138-
// Calculate sleep time to distribute remaining transactions evenly
139-
sleepTime := remainingTime / time.Duration(t.config.QPS-i)
140-
141-
// Generate and send transaction
142-
go func(index int) {
143-
if ctx.Err() != nil || isFailed {
144-
return
145-
}
146-
147-
tx, err := t.generateTransaction(ctx)
148-
if err != nil {
149-
t.logger.Errorf("Failed to create transaction: %v", err)
150-
t.ctx.SetResult(types.TaskResultFailure)
151-
isFailed = true
152-
return
153-
}
154-
155-
startTx := time.Now()
156-
157-
err = client.GetRPCClient().SendTransaction(ctx, tx)
158-
if err != nil {
159-
t.logger.Errorf("Failed to send transaction: %v. Nonce: %d. ", err, tx.Nonce())
160-
t.ctx.SetResult(types.TaskResultFailure)
161-
isFailed = true
162-
return
163-
}
164-
165-
// Store transaction data in synchronized arrays
166-
txHash := tx.Hash().String()
167-
sendTimestamps[index] = startTx
168-
hashToIndex[txHash] = index
169-
170-
txs = append(txs, tx)
171-
sentTxCount++
172-
173-
if sentTxCount%t.config.MeasureInterval == 0 {
174-
elapsed := time.Since(startTime)
175-
t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, elapsed.Seconds())
176-
}
177-
}(i)
178-
179-
if isFailed {
180-
return
181-
}
182-
183-
time.Sleep(sleepTime)
119+
for i := 0; i < t.config.TxCount; i++ {
120+
tx, err := t.generateTransaction(ctx)
121+
if err != nil {
122+
t.logger.Errorf("Failed to create transaction: %v", err)
123+
t.ctx.SetResult(types.TaskResultFailure)
124+
return nil
184125
}
185126

186-
execTime := time.Since(startExecTime)
187-
t.logger.Infof("Time to generate and send %d transactions: %v", t.config.QPS, execTime)
188-
}()
189-
190-
// Read transactions back from peer to measure network latency
191-
gotTx := 0
192-
for gotTx < t.config.QPS && !isFailed {
193-
if ctx.Err() != nil {
194-
break
195-
}
127+
startTx := time.Now()
196128

197-
receivedTxs, err := conn.ReadTransactionMessages()
129+
err = client.GetRPCClient().SendTransaction(ctx, tx)
198130
if err != nil {
199-
t.logger.Errorf("Failed to read transaction messages: %v", err)
131+
t.logger.Errorf("Failed to send transaction: %v. Nonce: %d. ", err, tx.Nonce())
200132
t.ctx.SetResult(types.TaskResultFailure)
201133
return nil
202134
}
203135

204-
// Process received transactions and store receive timestamps
205-
for _, receivedTx := range *receivedTxs {
206-
receivedHash := receivedTx.Hash().String()
207-
if index, exists := hashToIndex[receivedHash]; exists {
208-
receiveTimestamps[index] = time.Now()
209-
gotTx++
210-
}
211-
}
136+
txs = append(txs, tx)
212137

213-
if gotTx%t.config.MeasureInterval == 0 {
214-
t.logger.Infof("Received %d transactions", gotTx)
215-
}
216-
}
138+
// Create a context with timeout for reading transaction messages
139+
readCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
140+
defer cancel()
217141

218-
if isFailed {
219-
return nil
220-
}
142+
done := make(chan error, 1)
143+
go func() {
144+
_, readErr := conn.ReadTransactionMessages()
145+
done <- readErr
146+
}()
221147

222-
// Calculate latencies from synchronized arrays
223-
var latencies []time.Duration
224-
var totalLatency time.Duration
148+
select {
149+
case err = <-done:
150+
if err != nil {
151+
t.logger.Errorf("Failed to read transaction messages: %v", err)
152+
t.ctx.SetResult(types.TaskResultFailure)
153+
return nil
154+
}
155+
case <-readCtx.Done():
156+
t.logger.Warnf("Timeout waiting for transaction message at index %d, retrying transaction", i)
157+
i-- // Retry this transaction
158+
continue
159+
}
160+
161+
latency := time.Since(startTx)
162+
latencies = append(latencies, latency)
163+
totalLatency += latency
225164

226-
for i := range t.config.QPS {
227-
if !receiveTimestamps[i].IsZero() && !sendTimestamps[i].IsZero() {
228-
latency := receiveTimestamps[i].Sub(sendTimestamps[i])
229-
latencies = append(latencies, latency)
230-
totalLatency += latency
165+
if (i+1)%t.config.MeasureInterval == 0 {
166+
avgSoFar := totalLatency.Microseconds() / int64(i+1)
167+
t.logger.Infof("Processed %d transactions, current avg latency: %dmus.", i+1, avgSoFar)
231168
}
232169
}
233170

234-
avgLatency := time.Duration(0)
235-
if len(latencies) > 0 {
236-
avgLatency = totalLatency / time.Duration(len(latencies))
237-
}
171+
avgLatency := totalLatency / time.Duration(t.config.TxCount)
238172
t.logger.Infof("Average transaction latency: %dmus", avgLatency.Microseconds())
239173

240174
// send to other clients, for speeding up tx mining
@@ -327,17 +261,15 @@ func (t *Task) Execute(ctx context.Context) error {
327261
t.logger.Errorf("Transaction latency too high: %dmus (expected <= %dmus)", avgLatency.Microseconds(), t.config.HighLatency)
328262
t.ctx.SetResult(types.TaskResultFailure)
329263
} else {
330-
t.ctx.Outputs.SetVar("tx_count", len(latencies))
331-
t.ctx.Outputs.SetVar("qps", t.config.QPS)
264+
t.ctx.Outputs.SetVar("tx_count", t.config.TxCount)
332265
t.ctx.Outputs.SetVar("avg_latency_mus", avgLatency.Microseconds())
333266
t.ctx.Outputs.SetVar("latencies", latenciesStats)
334267

335268
t.ctx.SetResult(types.TaskResultSuccess)
336269
}
337270

338271
outputs := map[string]interface{}{
339-
"tx_count": len(latencies),
340-
"qps": t.config.QPS,
272+
"tx_count": t.config.TxCount,
341273
"avg_latency_mus": avgLatency.Microseconds(),
342274
"tx_pool_latency_hdr_plot": plot,
343275
"latencies": latenciesStats,

0 commit comments

Comments
 (0)