Skip to content

Commit 1ea89ba

Browse files
committed
save
1 parent 1e9ca4f commit 1ea89ba

File tree

1 file changed

+125
-47
lines changed

1 file changed

+125
-47
lines changed

cmd/monitor/monitor.go

+125-47
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"errors"
66
"fmt"
77
"math/big"
8-
"strings"
8+
"os"
9+
"os/signal"
910
"sync"
11+
"syscall"
1012
"time"
1113

1214
lru "github.com/hashicorp/golang-lru"
@@ -49,7 +51,7 @@ var (
4951
maxDataPoints = 1000
5052

5153
// maxConcurrency defines the maximum number of goroutines that can fetch block data concurrently.
52-
maxConcurrency = 10
54+
maxConcurrency = 1
5355

5456
// semaphore is a channel used to control the concurrency of block data fetch operations.
5557
semaphore = make(chan struct{}, maxConcurrency)
@@ -93,7 +95,9 @@ const (
9395
)
9496

9597
func monitor(ctx context.Context) error {
96-
// Dial rpc.
98+
ctx, cancel := setupCancelContext(ctx)
99+
defer cancel()
100+
97101
rpc, err := ethrpc.DialContext(ctx, rpcUrl)
98102
if err != nil {
99103
log.Error().Err(err).Msg("Unable to dial rpc")
@@ -132,8 +136,9 @@ func monitor(ctx context.Context) error {
132136
}
133137
}()
134138
select {
135-
case <-ctx.Done(): // listens for a cancellation signal
136-
return // exit the goroutine when the context is done
139+
case <-ctx.Done():
140+
log.Error().Err(ctx.Err()).Msg("We are here")
141+
return
137142
default:
138143
for {
139144
err = fetchCurrentBlockData(ctx, ec, ms, isUiRendered)
@@ -159,6 +164,24 @@ func monitor(ctx context.Context) error {
159164
return err
160165
}
161166

167+
func setupCancelContext(ctx context.Context) (context.Context, context.CancelFunc) {
168+
ctx, cancel := context.WithCancel(ctx)
169+
170+
sigChan := make(chan os.Signal, 1)
171+
signal.Notify(sigChan, os.Interrupt, syscall.SIGHUP,
172+
syscall.SIGINT,
173+
syscall.SIGTERM,
174+
syscall.SIGQUIT)
175+
176+
go func() {
177+
<-sigChan
178+
log.Error().Msg("WE ARE HERE")
179+
cancel()
180+
}()
181+
182+
return ctx, cancel
183+
}
184+
162185
func getChainState(ctx context.Context, ec *ethclient.Client) (*chainState, error) {
163186
var err error
164187
cs := new(chainState)
@@ -305,6 +328,61 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et
305328
return nil
306329
}
307330

331+
func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error {
332+
var wg sync.WaitGroup
333+
var errs []error = make([]error, 0)
334+
var errorsMutex sync.Mutex
335+
336+
for i := 0; i < len(blms); i += subBatchSize {
337+
semaphore <- struct{}{}
338+
wg.Add(1)
339+
go func(i int) {
340+
defer func() {
341+
<-semaphore
342+
wg.Done()
343+
}()
344+
end := i + subBatchSize
345+
if end > len(blms) {
346+
end = len(blms)
347+
}
348+
subBatch := blms[i:end]
349+
350+
b := backoff.NewExponentialBackOff()
351+
b.MaxElapsedTime = 3 * time.Minute
352+
retryable := func() error {
353+
if err := rpc.BatchCallContext(ctx, subBatch); err != nil {
354+
log.Error().Err(err).Msg("Error when performing batch calls")
355+
return backoff.Permanent(err)
356+
}
357+
return nil
358+
}
359+
if err := backoff.Retry(retryable, b); err != nil {
360+
log.Error().Err(err).Msg("unable to retry")
361+
errorsMutex.Lock()
362+
errs = append(errs, err)
363+
errorsMutex.Unlock()
364+
return
365+
}
366+
367+
for _, elem := range subBatch {
368+
if elem.Error != nil {
369+
log.Error().Str("Method", elem.Method).Interface("Args", elem.Args).Err(elem.Error).Msg("Failed batch element")
370+
} else {
371+
pb := rpctypes.NewPolyBlock(elem.Result.(*rpctypes.RawBlockResponse))
372+
ms.BlocksLock.Lock()
373+
ms.BlockCache.Add(pb.Number().String(), pb)
374+
ms.BlocksLock.Unlock()
375+
}
376+
}
377+
378+
}(i)
379+
}
380+
381+
wg.Wait()
382+
383+
return errors.Join(errs...)
384+
}
385+
308386
func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error {
309387
var wg sync.WaitGroup
310388
var errs []error = make([]error, 0)
@@ -332,14 +410,15 @@ func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *et
332410
retryable := func() error {
333411
select {
334412
case <-ctx.Done():
413+
log.Error().Err(ctx.Err()).Msg("WE ARE HERE")
335414
return ctx.Err()
336415
default:
337416
err := rpc.BatchCallContext(ctx, subBatch)
338417
if err != nil {
339418
log.Error().Err(err).Msg("BatchCallContext error - retry loop")
340-
if strings.Contains(err.Error(), "limit") {
341-
return backoff.Permanent(err)
342-
}
419+
// if strings.Contains(err.Error(), "limit") {
420+
// return backoff.Permanent(err)
421+
// }
343422
}
344423
return err
345424
}
@@ -403,44 +482,6 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
403482
var renderedBlocks rpctypes.SortableBlocks
404483

405484
redraw := func(ms *monitorStatus, force ...bool) {
406-
407-
if currentMode == monitorModeHelp {
408-
// TODO add some help context?
409-
} else if currentMode == monitorModeBlock {
410-
// render a block
411-
skeleton.BlockInfo.Rows = ui.GetSimpleBlockFields(ms.SelectedBlock)
412-
rows, title := ui.GetTransactionsList(ms.SelectedBlock, ms.ChainID)
413-
transactionList.Rows = rows
414-
transactionList.Title = title
415-
416-
baseFee := ms.SelectedBlock.BaseFee()
417-
if transactionList.SelectedRow != 0 {
418-
ms.SelectedTransaction = ms.SelectedBlock.Transactions()[transactionList.SelectedRow-1]
419-
transactionInformationList.Rows = ui.GetSimpleTxFields(ms.SelectedTransaction, ms.ChainID, baseFee)
420-
}
421-
termui.Clear()
422-
termui.Render(blockGrid)
423-
424-
log.Debug().
425-
Int("skeleton.TransactionList.SelectedRow", transactionList.SelectedRow).
426-
Msg("Redrawing block mode")
427-
428-
return
429-
} else if currentMode == monitorModeTransaction {
430-
baseFee := ms.SelectedBlock.BaseFee()
431-
skeleton.TxInfo.Rows = ui.GetSimpleTxFields(ms.SelectedBlock.Transactions()[transactionList.SelectedRow-1], ms.ChainID, baseFee)
432-
skeleton.Receipts.Rows = ui.GetSimpleReceipt(ctx, rpc, ms.SelectedTransaction)
433-
434-
termui.Clear()
435-
termui.Render(transactionGrid)
436-
437-
log.Debug().
438-
Int("skeleton.TransactionList.SelectedRow", transactionList.SelectedRow).
439-
Msg("Redrawing transaction mode")
440-
441-
return
442-
}
443-
444485
log.Debug().
445486
Str("TopDisplayedBlock", ms.TopDisplayedBlock.String()).
446487
Int("BatchSize", batchSize.Get()).
@@ -482,7 +523,44 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
482523
ms.BlocksLock.RUnlock()
483524
renderedBlocks = renderedBlocksTemp
484525

485-
log.Warn().Int("skeleton.Current.Inner.Dy()", skeleton.Current.Inner.Dy()).Int("skeleton.Current.Inner.Dx()", skeleton.Current.Inner.Dx()).Msg("the dimension of the current box")
526+
if currentMode == monitorModeHelp {
527+
// TODO add some help context?
528+
} else if currentMode == monitorModeBlock {
529+
// render a block
530+
skeleton.BlockInfo.Rows = ui.GetSimpleBlockFields(ms.SelectedBlock)
531+
rows, title := ui.GetTransactionsList(ms.SelectedBlock, ms.ChainID)
532+
transactionList.Rows = rows
533+
transactionList.Title = title
534+
535+
baseFee := ms.SelectedBlock.BaseFee()
536+
if transactionList.SelectedRow != 0 {
537+
ms.SelectedTransaction = ms.SelectedBlock.Transactions()[transactionList.SelectedRow-1]
538+
transactionInformationList.Rows = ui.GetSimpleTxFields(ms.SelectedTransaction, ms.ChainID, baseFee)
539+
}
540+
termui.Clear()
541+
termui.Render(blockGrid)
542+
543+
log.Debug().
544+
Int("skeleton.TransactionList.SelectedRow", transactionList.SelectedRow).
545+
Msg("Redrawing block mode")
546+
547+
return
548+
} else if currentMode == monitorModeTransaction {
549+
baseFee := ms.SelectedBlock.BaseFee()
550+
skeleton.TxInfo.Rows = ui.GetSimpleTxFields(ms.SelectedBlock.Transactions()[transactionList.SelectedRow-1], ms.ChainID, baseFee)
551+
skeleton.Receipts.Rows = ui.GetSimpleReceipt(ctx, rpc, ms.SelectedTransaction)
552+
553+
termui.Clear()
554+
termui.Render(transactionGrid)
555+
556+
log.Debug().
557+
Int("skeleton.TransactionList.SelectedRow", transactionList.SelectedRow).
558+
Msg("Redrawing transaction mode")
559+
560+
return
561+
}
562+
563+
log.Debug().Int("skeleton.Current.Inner.Dy()", skeleton.Current.Inner.Dy()).Int("skeleton.Current.Inner.Dx()", skeleton.Current.Inner.Dx()).Msg("the dimension of the current box")
486564
skeleton.Current.Text = ui.GetCurrentBlockInfo(ms.HeadBlock, ms.GasPrice, ms.PeerCount, ms.PendingCount, ms.ChainID, renderedBlocks, skeleton.Current.Inner.Dx(), skeleton.Current.Inner.Dy())
487565
skeleton.TxPerBlockChart.Data = metrics.GetTxsPerBlock(renderedBlocks)
488566
skeleton.GasPriceChart.Data = metrics.GetMeanGasPricePerBlock(renderedBlocks)

0 commit comments

Comments
 (0)