|
7 | 7 | "fmt" |
8 | 8 | "math/big" |
9 | 9 | "math/rand" |
| 10 | + "sort" |
10 | 11 | "time" |
11 | 12 |
|
12 | 13 | "github.com/ethereum/go-ethereum/accounts/abi/bind" |
@@ -152,7 +153,7 @@ func (t *Task) Execute(ctx context.Context) error { |
152 | 153 | return nil |
153 | 154 | } |
154 | 155 | case <-readCtx.Done(): |
155 | | - t.logger.Warnf("Timeout waiting for transaction message, retrying transaction") |
| 156 | + t.logger.Warnf("Timeout waiting for transaction message at index %d, retrying transaction", i) |
156 | 157 | i-- // Retry this transaction |
157 | 158 | continue |
158 | 159 | } |
@@ -181,35 +182,97 @@ func (t *Task) Execute(ctx context.Context) error { |
181 | 182 | } |
182 | 183 | } |
183 | 184 |
|
184 | | - if t.config.FailOnHighLatency && avgLatency.Microseconds() > t.config.HighLatency { |
185 | | - t.logger.Errorf("Transaction latency too high: %dmus (expected <= %dmus)", avgLatency.Microseconds(), t.config.HighLatency) |
186 | | - t.ctx.SetResult(types.TaskResultFailure) |
187 | | - } else { |
188 | | - t.ctx.Outputs.SetVar("tx_count", t.config.TxCount) |
189 | | - t.ctx.Outputs.SetVar("avg_latency_mus", avgLatency.Microseconds()) |
190 | | - t.ctx.Outputs.SetVar("detailed_latencies", latencies) |
| 185 | + // Convert latencies to microseconds for processing |
| 186 | + latenciesMus := make([]int64, len(latencies)) |
| 187 | + for i, latency := range latencies { |
| 188 | + latenciesMus[i] = latency.Microseconds() |
| 189 | + } |
191 | 190 |
|
192 | | - t.ctx.SetResult(types.TaskResultSuccess) |
| 191 | + // Calculate statistics |
| 192 | + var totalLatencyMus int64 |
| 193 | + var maxLatency int64 = 0 |
| 194 | + var minLatency int64 = 0 |
| 195 | + if len(latenciesMus) > 0 { |
| 196 | + minLatency = latenciesMus[0] |
193 | 197 | } |
194 | 198 |
|
195 | | - latenciesMus := make([]int64, len(latencies)) |
| 199 | + for _, lat := range latenciesMus { |
| 200 | + totalLatencyMus += lat |
| 201 | + if lat > maxLatency { |
| 202 | + maxLatency = lat |
| 203 | + } |
| 204 | + if lat < minLatency { |
| 205 | + minLatency = lat |
| 206 | + } |
| 207 | + } |
196 | 208 |
|
197 | | - for i, latency := range latencies { |
198 | | - latenciesMus[i] = latency.Microseconds() |
| 209 | + // Calculate mean |
| 210 | + var meanLatency float64 = 0 |
| 211 | + if len(latenciesMus) > 0 { |
| 212 | + meanLatency = float64(totalLatencyMus) / float64(len(latenciesMus)) |
| 213 | + } |
| 214 | + |
| 215 | + // Sort for percentiles |
| 216 | + sortedLatencies := make([]int64, len(latenciesMus)) |
| 217 | + copy(sortedLatencies, latenciesMus) |
| 218 | + sort.Slice(sortedLatencies, func(i, j int) bool { |
| 219 | + return sortedLatencies[i] < sortedLatencies[j] |
| 220 | + }) |
| 221 | + |
| 222 | + // Calculate percentiles |
| 223 | + percentile50th := float64(0) |
| 224 | + percentile90th := float64(0) |
| 225 | + percentile95th := float64(0) |
| 226 | + percentile99th := float64(0) |
| 227 | + |
| 228 | + if len(sortedLatencies) > 0 { |
| 229 | + getPercentile := func(pct float64) float64 { |
| 230 | + idx := int(float64(len(sortedLatencies)-1) * pct / 100) |
| 231 | + return float64(sortedLatencies[idx]) |
| 232 | + } |
| 233 | + |
| 234 | + percentile50th = getPercentile(50) |
| 235 | + percentile90th = getPercentile(90) |
| 236 | + percentile95th = getPercentile(95) |
| 237 | + percentile99th = getPercentile(99) |
| 238 | + } |
| 239 | + |
| 240 | + // Create statistics map for output |
| 241 | + latenciesStats := map[string]float64{ |
| 242 | + "total": float64(totalLatencyMus), |
| 243 | + "mean": meanLatency, |
| 244 | + "50th": percentile50th, |
| 245 | + "90th": percentile90th, |
| 246 | + "95th": percentile95th, |
| 247 | + "99th": percentile99th, |
| 248 | + "max": float64(maxLatency), |
| 249 | + "min": float64(minLatency), |
199 | 250 | } |
200 | 251 |
|
| 252 | + // Generate HDR plot |
201 | 253 | plot, err := hdr.HdrPlot(latenciesMus) |
202 | 254 | if err != nil { |
203 | 255 | t.logger.Errorf("Failed to generate HDR plot: %v", err) |
204 | 256 | t.ctx.SetResult(types.TaskResultFailure) |
205 | 257 | return nil |
206 | 258 | } |
207 | 259 |
|
| 260 | + if t.config.FailOnHighLatency && avgLatency.Microseconds() > t.config.HighLatency { |
| 261 | + t.logger.Errorf("Transaction latency too high: %dmus (expected <= %dmus)", avgLatency.Microseconds(), t.config.HighLatency) |
| 262 | + t.ctx.SetResult(types.TaskResultFailure) |
| 263 | + } else { |
| 264 | + t.ctx.Outputs.SetVar("tx_count", t.config.TxCount) |
| 265 | + t.ctx.Outputs.SetVar("avg_latency_mus", avgLatency.Microseconds()) |
| 266 | + t.ctx.Outputs.SetVar("latencies", latenciesStats) |
| 267 | + |
| 268 | + t.ctx.SetResult(types.TaskResultSuccess) |
| 269 | + } |
| 270 | + |
208 | 271 | outputs := map[string]interface{}{ |
209 | 272 | "tx_count": t.config.TxCount, |
210 | 273 | "avg_latency_mus": avgLatency.Microseconds(), |
211 | 274 | "tx_pool_latency_hdr_plot": plot, |
212 | | - "latencies": latenciesMus, |
| 275 | + "latencies": latenciesStats, |
213 | 276 | } |
214 | 277 |
|
215 | 278 | outputsJSON, _ := json.Marshal(outputs) |
|
0 commit comments