Skip to content

Commit ec6739a

Browse files
fix(tx_pool_latency_analysis): implement timeout for reading transaction messages to handle potential delays and improve task reliability
1 parent 616c3b1 commit ec6739a

File tree

1 file changed

+25
-9
lines changed
  • pkg/coordinator/tasks/tx_pool_latency_analysis

1 file changed

+25
-9
lines changed

pkg/coordinator/tasks/tx_pool_latency_analysis/task.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,27 @@ func (t *Task) Execute(ctx context.Context) error {
146146
txs = append(txs, tx)
147147
retryCount = 0
148148

149-
_, err = conn.ReadTransactionMessages()
150-
if err != nil {
151-
t.logger.Errorf("Failed to read transaction messages: %v", err)
152-
t.ctx.SetResult(types.TaskResultFailure)
153-
return nil
149+
// Create a context with timeout for reading transaction messages
150+
readCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
151+
defer cancel()
152+
153+
done := make(chan error, 1)
154+
go func() {
155+
_, readErr := conn.ReadTransactionMessages()
156+
done <- readErr
157+
}()
158+
159+
select {
160+
case err = <-done:
161+
if err != nil {
162+
t.logger.Errorf("Failed to read transaction messages: %v", err)
163+
t.ctx.SetResult(types.TaskResultFailure)
164+
return nil
165+
}
166+
case <-readCtx.Done():
167+
t.logger.Warnf("Timeout waiting for transaction message, retrying transaction")
168+
i-- // Retry this transaction
169+
continue
154170
}
155171

156172
latency := time.Since(startTx)
@@ -187,7 +203,7 @@ func (t *Task) Execute(ctx context.Context) error {
187203

188204
t.ctx.SetResult(types.TaskResultSuccess)
189205
}
190-
206+
191207
latenciesMus := make([]int64, len(latencies))
192208

193209
for i, latency := range latencies {
@@ -202,10 +218,10 @@ func (t *Task) Execute(ctx context.Context) error {
202218
}
203219

204220
outputs := map[string]interface{}{
205-
"tx_count": t.config.TxCount,
206-
"avg_latency_mus": avgLatency.Microseconds(),
221+
"tx_count": t.config.TxCount,
222+
"avg_latency_mus": avgLatency.Microseconds(),
207223
"tx_pool_latency_hdr_plot": plot,
208-
"latencies": latenciesMus,
224+
"latencies": latenciesMus,
209225
}
210226

211227
outputsJSON, _ := json.Marshal(outputs)

0 commit comments

Comments
 (0)