@@ -167,11 +167,11 @@ func (ci *Engine) IndexHistory(ctx context.Context, startIndex uint64) (uint64,
167167 return 0 , err
168168 }
169169
170- logger .Infof ("Starting to index blocks from %d to %d" , ixRange .start , ixRange .end )
170+ logger .Infof ("Starting history indexing: from=%d, to= %d" , ixRange .start , ixRange .end )
171171
172172 for i := ixRange .start ; i <= ixRange .end ; i = i + ci .params .BatchSize {
173173 if err := ci .indexBatch (ctx , states , i , ixRange ); err != nil {
174- return 0 , err
174+ return 0 , errors . Wrapf ( err , "indexBatch: from=%d, to=%d" , i , min ( i + ci . params . BatchSize - 1 , ixRange . end ))
175175 }
176176
177177 // in the second to last run of the loop update lastIndex to get the blocks
@@ -190,6 +190,7 @@ func (ci *Engine) IndexHistory(ctx context.Context, startIndex uint64) (uint64,
190190func (ci * Engine ) indexBatch (
191191 ctx context.Context , states * database.DBStates , batchIx uint64 , ixRange * indexRange ,
192192) error {
193+ batchStart := time .Now ()
193194 lastBlockNumInRound := min (batchIx + ci .params .BatchSize - 1 , ixRange .end )
194195
195196 bBatch , err := ci .obtainBlocksBatch (ctx , batchIx , lastBlockNumInRound )
@@ -217,6 +218,7 @@ func (ci *Engine) indexBatch(
217218 batchIx ,
218219 lastBlockNumInRound ,
219220 lastForDBTimestamp ,
221+ batchStart ,
220222 )
221223}
222224
@@ -255,8 +257,8 @@ func (ci *Engine) obtainBlocksBatch(ctx context.Context, firstBlockNumber uint64
255257 return nil , err
256258 }
257259
258- logger .Infof (
259- "Successfully obtained blocks %d to %d in %d milliseconds " ,
260+ logger .Debugf (
261+ "Fetched blocks: from=%d, to=%d, duration_ms=%d " ,
260262 firstBlockNumber , lastBlockNumInRound , time .Since (startTime ).Milliseconds (),
261263 )
262264
@@ -268,8 +270,8 @@ func (ci *Engine) processBlocksBatch(bBatch *blockBatch) *transactionsBatch {
268270 txBatch := new (transactionsBatch )
269271
270272 ci .processBlocks (bBatch , txBatch )
271- logger .Infof (
272- "Successfully extracted %d transactions in %d milliseconds " ,
273+ logger .Debugf (
274+ "Extracted transactions: count=%d, duration_ms=%d " ,
273275 len (txBatch .transactions ), time .Since (startTime ).Milliseconds (),
274276 )
275277
@@ -297,8 +299,8 @@ func (ci *Engine) processTransactionsBatch(
297299 return err
298300 }
299301
300- logger .Infof (
301- "Checked receipts of %d transactions in %d milliseconds " ,
302+ logger .Debugf (
303+ "Checked receipts: count=%d, duration_ms=%d " ,
302304 countReceipts (txBatch ), time .Since (startTime ).Milliseconds (),
303305 )
304306
@@ -337,8 +339,8 @@ func (ci *Engine) obtainLogsBatch(
337339 }
338340 }
339341
340- logger .Infof (
341- "Obtained %d logs by request in %d milliseconds " ,
342+ logger .Debugf (
343+ "Fetched logs: count=%d, duration_ms=%d " ,
342344 len (lgBatch .logs ), time .Since (startTime ).Milliseconds (),
343345 )
344346
@@ -393,9 +395,8 @@ func (ci *Engine) processAndSave(
393395 lgBatch * logsBatch ,
394396 states * database.DBStates ,
395397 firstBlockNum , lastDBIndex , lastDBTimestamp uint64 ,
398+ batchStart time.Time ,
396399) error {
397- startTime := time .Now ()
398-
399400 data := newDatabaseStructData ()
400401 data .Blocks = ci .convertBlocksToDB (bBatch )
401402
@@ -408,25 +409,26 @@ func (ci *Engine) processAndSave(
408409 return errors .Wrap (err , "ci.processLogs" )
409410 }
410411
411- logger .Infof (
412- "Processed %d blocks with %d transactions and extracted %d logs from receipts and %d new logs from requests in %d milliseconds" ,
413- len (data .Blocks ),
414- len (txBatch .transactions ),
415- numLogsFromReceipts ,
416- len (data .Logs )- numLogsFromReceipts ,
417- time .Since (startTime ).Milliseconds (),
418- )
419-
420- // Push transactions and logs in the database
421- startTime = time .Now ()
412+ saveStart := time .Now ()
422413 if err := ci .saveData (data , states , lastDBIndex , lastDBTimestamp ); err != nil {
423414 return errors .Wrap (err , "ci.saveData" )
424415 }
425416
426- logger .Infof (
427- "Saved %d transactions and %d logs in the DB in %d milliseconds " ,
417+ logger .Debugf (
418+ "Saved batch: transactions=%d, logs=%d, duration_ms=%d " ,
428419 len (data .Transactions ), len (data .Logs ),
429- time .Since (startTime ).Milliseconds (),
420+ time .Since (saveStart ).Milliseconds (),
421+ )
422+
423+ logger .Infof (
424+ "Processed batch: from=%d, to=%d, blocks=%d, transactions=%d, receipt_logs=%d, filter_logs=%d, duration_ms=%d" ,
425+ firstBlockNum ,
426+ lastDBIndex ,
427+ len (data .Blocks ),
428+ len (txBatch .transactions ),
429+ numLogsFromReceipts ,
430+ len (data .Logs )- numLogsFromReceipts ,
431+ time .Since (batchStart ).Milliseconds (),
430432 )
431433
432434 return nil
@@ -450,7 +452,7 @@ func (ci *Engine) updateLastIndexHistory(
450452
451453 if lastChainIndex > ixRange .end && ci .params .StopIndex > ixRange .end {
452454 ixRange .end = min (lastChainIndex , ci .params .StopIndex )
453- logger .Infof ("Updating the last block to %d" , ixRange .end )
455+ logger .Infof ("Extending history range: last_block= %d" , ixRange .end )
454456 }
455457
456458 return ixRange , nil
@@ -467,14 +469,13 @@ func (ci *Engine) IndexContinuous(ctx context.Context, startIndex uint64) error
467469 return errors .Wrap (err , "ci.getIndexRange" )
468470 }
469471
470- logger .Infof ("Continuously indexing blocks from %d" , ixRange .start )
472+ logger .Infof ("Starting continuous indexing: from= %d" , ixRange .start )
471473
472474 // Request blocks one by one
473475 blockNum := ixRange .start
474476 lastProcessedBlockTime := [2 ]time.Time {time .Now (), time .Now ()}
475477 for blockNum <= ci .params .StopIndex {
476478 if blockNum > ixRange .end {
477- logger .Debugf ("Up to date, last block %d" , states .States [database .LastChainIndexState ].Index )
478479 time .Sleep (time .Millisecond * time .Duration (ci .params .NewBlockCheckMillis ))
479480
480481 ixRange , err = ci .updateLastIndexContinuous (ctx , states , ixRange )
@@ -485,7 +486,7 @@ func (ci *Engine) IndexContinuous(ctx context.Context, startIndex uint64) error
485486 elapsed := time .Since (lastProcessedBlockTime [0 ]).Seconds ()
486487 delay := ci .params .NoNewBlocksDelayWarning
487488 if delay != 0 && elapsed > delay {
488- logger .Warnf ("Warning: %.2f seconds have elapsed since the last block was processed. " , time .Since (lastProcessedBlockTime [1 ]).Seconds ())
489+ logger .Warnf ("No new blocks: elapsed_seconds= %.2f" , time .Since (lastProcessedBlockTime [1 ]).Seconds ())
489490 lastProcessedBlockTime [0 ] = time .Now ()
490491 }
491492
@@ -501,15 +502,16 @@ func (ci *Engine) IndexContinuous(ctx context.Context, startIndex uint64) error
501502 blockNum ++
502503 }
503504
504- logger .Debugf ("Stopping the indexer at block %d" , blockNum )
505+ logger .Debugf ("Stopping continuous indexing: block= %d" , blockNum )
505506
506507 return nil
507508}
508509
509510func (ci * Engine ) indexContinuousIteration (ctx context.Context , states * database.DBStates , index uint64 ) error {
511+ startTime := time .Now ()
510512 block , err := ci .fetchBlock (ctx , & index )
511513 if err != nil {
512- return errors .Wrap (err , "ci. fetchBlock" )
514+ return errors .Wrapf (err , "fetchBlock: block=%d" , index )
513515 }
514516
515517 bBatch := & blockBatch {blocks : []* chain.Block {block }}
@@ -519,36 +521,41 @@ func (ci *Engine) indexContinuousIteration(ctx context.Context, states *database
519521
520522 err = ci .getTransactionsReceipt (ctx , txBatch , 0 , len (txBatch .transactions ))
521523 if err != nil {
522- return errors .Wrap (err , "ci. getTransactionsReceipt" )
524+ return errors .Wrapf (err , "getTransactionsReceipt: block=%d" , index )
523525 }
524526
525527 logsBatch := new (logsBatch )
526528 for _ , logInfo := range ci .params .CollectLogs {
527529 err = ci .requestLogs (ctx , logsBatch , logInfo , index , index + 1 , index )
528530 if err != nil {
529- return fmt . Errorf ( "IndexContinuous: %w " , err )
531+ return errors . Wrapf ( err , "requestLogs: block=%d " , index )
530532 }
531533 }
532534
533535 data := newDatabaseStructData ()
534536 data .Blocks = ci .convertBlocksToDB (bBatch )
535537
536538 if err := ci .processTransactions (txBatch , data ); err != nil {
537- return fmt . Errorf ( "IndexContinuous: %w " , err )
539+ return errors . Wrapf ( err , "processTransactions: block=%d " , index )
538540 }
539541
540542 err = ci .processLogs (logsBatch , bBatch , index , data )
541543 if err != nil {
542- return fmt . Errorf ( "IndexContinuous: %w " , err )
544+ return errors . Wrapf ( err , "processLogs: block=%d " , index )
543545 }
544546
545547 indexTimestamp := bBatch .blocks [0 ].Time ()
546548 if err := ci .saveData (data , states , index , indexTimestamp ); err != nil {
547- return fmt . Errorf ( "IndexContinuous: %w " , err )
549+ return errors . Wrapf ( err , "saveData: block=%d " , index )
548550 }
549551
552+ logger .Debugf (
553+ "Indexed block: block=%d, transactions=%d, logs=%d, duration_ms=%d" ,
554+ index , len (data .Transactions ), len (data .Logs ), time .Since (startTime ).Milliseconds (),
555+ )
556+
550557 if index % 1000 == 0 {
551- logger .Infof ("Indexer at block %d" , index )
558+ logger .Infof ("Continuous progress: block= %d" , index )
552559 }
553560
554561 return nil
0 commit comments