|
7 | 7 | "math/big"
|
8 | 8 | "os"
|
9 | 9 | "os/signal"
|
| 10 | + "strings" |
10 | 11 | "sync"
|
11 | 12 | "syscall"
|
12 | 13 | "time"
|
@@ -350,9 +351,12 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et
|
350 | 351 | b := backoff.NewExponentialBackOff()
|
351 | 352 | b.MaxElapsedTime = 3 * time.Minute
|
352 | 353 | retryable := func() error {
|
353 |
| - if err := rpc.BatchCallContext(ctx, subBatch); err != nil { |
354 |
| - log.Error().Err(err).Msg("Error when performing batch calls") |
355 |
| - return backoff.Permanent(err) |
| 354 | + err := rpc.BatchCallContext(ctx, subBatch) |
| 355 | + if err != nil { |
| 356 | + log.Error().Err(err).Msg("BatchCallContext error - retry loop") |
| 357 | + if strings.Contains(err.Error(), "limit") { |
| 358 | + return backoff.Permanent(err) |
| 359 | + } |
356 | 360 | }
|
357 | 361 | return nil
|
358 | 362 | }
|
@@ -383,82 +387,6 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et
|
383 | 387 | return errors.Join(errs...)
|
384 | 388 | }
|
385 | 389 |
|
386 |
| -func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error { |
387 |
| - var wg sync.WaitGroup |
388 |
| - var errs []error = make([]error, 0) |
389 |
| - var errorsMutex sync.Mutex |
390 |
| - |
391 |
| - for i := 0; i < len(blms); i += subBatchSize { |
392 |
| - semaphore <- struct{}{} |
393 |
| - wg.Add(1) |
394 |
| - go func(i int) { |
395 |
| - defer func() { |
396 |
| - <-semaphore |
397 |
| - wg.Done() |
398 |
| - }() |
399 |
| - end := i + subBatchSize |
400 |
| - if end > len(blms) { |
401 |
| - end = len(blms) |
402 |
| - } |
403 |
| - subBatch := blms[i:end] |
404 |
| - |
405 |
| - doneCh := make(chan error, 1) |
406 |
| - |
407 |
| - go func() { |
408 |
| - b := backoff.NewExponentialBackOff() |
409 |
| - b.MaxElapsedTime = 3 * time.Minute |
410 |
| - retryable := func() error { |
411 |
| - select { |
412 |
| - case <-ctx.Done(): |
413 |
| - log.Error().Err(ctx.Err()).Msg("WE ARE HERE") |
414 |
| - return ctx.Err() |
415 |
| - default: |
416 |
| - err := rpc.BatchCallContext(ctx, subBatch) |
417 |
| - if err != nil { |
418 |
| - log.Error().Err(err).Msg("BatchCallContext error - retry loop") |
419 |
| - // if strings.Contains(err.Error(), "limit") { |
420 |
| - // return backoff.Permanent(err) |
421 |
| - // } |
422 |
| - } |
423 |
| - return err |
424 |
| - } |
425 |
| - } |
426 |
| - err := backoff.Retry(retryable, b) |
427 |
| - doneCh <- err |
428 |
| - }() |
429 |
| - |
430 |
| - select { |
431 |
| - case <-ctx.Done(): |
432 |
| - return |
433 |
| - case err := <-doneCh: |
434 |
| - if err != nil { |
435 |
| - log.Error().Err(err).Msg("unable to retry") |
436 |
| - errorsMutex.Lock() |
437 |
| - errs = append(errs, err) |
438 |
| - errorsMutex.Unlock() |
439 |
| - return |
440 |
| - } |
441 |
| - |
442 |
| - for _, elem := range subBatch { |
443 |
| - if elem.Error != nil { |
444 |
| - log.Error().Str("Method", elem.Method).Interface("Args", elem.Args).Err(elem.Error).Msg("Failed batch element") |
445 |
| - } else { |
446 |
| - pb := rpctypes.NewPolyBlock(elem.Result.(*rpctypes.RawBlockResponse)) |
447 |
| - ms.BlocksLock.Lock() |
448 |
| - ms.BlockCache.Add(pb.Number().String(), pb) |
449 |
| - ms.BlocksLock.Unlock() |
450 |
| - } |
451 |
| - } |
452 |
| - } |
453 |
| - |
454 |
| - }(i) |
455 |
| - } |
456 |
| - |
457 |
| - wg.Wait() |
458 |
| - |
459 |
| - return errors.Join(errs...) |
460 |
| -} |
461 |
| - |
462 | 390 | func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error {
|
463 | 391 | if err := termui.Init(); err != nil {
|
464 | 392 | log.Error().Err(err).Msg("Failed to initialize UI")
|
|
0 commit comments