@@ -44,28 +44,55 @@ def __init__(
4444 self ._client = client
4545 self .instance_config = instance_config
4646
47- # BtcClient only needs one worker
48- if isinstance (self ._client , BtcClient ):
49- self .workers = [new_session (instance_config )]
50- else :
51- self .workers = [new_session (instance_config ) for _ in range (instance_config .NUMBER_OF_WORKERS )]
47+ # create workers / connect ititial seesions
48+ self .connect_workers ()
5249
53- self .toplevel_worker = self .workers [0 ]
5450 assert expected_production > 0 , "Expected block production time should be positive"
5551 self .expected_block_production_time = expected_production
5652
5753 # Determining starting block height for indexing
5854 self .latest_indexed_block_height = self .extract_initial_block_height ()
5955 self .latest_tip_block_height = 0
6056
57+ def connect_workers (self ) -> None :
58+ """
59+ Connects all workers to the node
60+ """
61+ # BtcClient only needs one worker
62+ if isinstance (self ._client , BtcClient ):
63+ self .workers = [new_session (self .instance_config )]
64+ else :
65+ self .workers = [new_session (self .instance_config ) for _ in range (self .instance_config .NUMBER_OF_WORKERS )]
66+ self .toplevel_worker = self .workers [0 ]
67+
68+ def kill_workers (self ) -> None :
69+ """
70+ Kills all workers
71+ """
72+ for worker in self .workers :
73+ worker .close ()
74+ self .workers = []
75+ self .toplevel_worker = None
76+
77+ def ensure_workers (self ) -> None :
78+ """
79+ Ensures that all workers are connected
80+ """
81+ if len (self .workers ) == 0 or self .toplevel_worker is None :
82+ self .connect_workers ()
83+ logger .info ("Reconnected workers" )
84+
6185 def extract_initial_block_height (self ) -> int :
6286 """
6387 Extracts the initial block height from the config
6488 """
6589
6690 logger .info ("Extracting initial block height" )
91+ self .ensure_workers ()
6792 logger .info ("Number of active workeres: %s" , len (self .workers ))
6893
94+ assert self .toplevel_worker is not None , "Toplevel worker should be connected and defined"
95+
6996 latest_block = UtxoBlock .objects .order_by ("block_number" ).last ()
7097 if latest_block is not None :
7198 if latest_block .block_number < self .instance_config .INITIAL_BLOCK_HEIGHT :
@@ -107,6 +134,8 @@ def run(self) -> None:
107134 """
108135 logger .info ("Starting the indexer" )
109136 while True :
137+ self .ensure_workers ()
138+ assert self .toplevel_worker is not None , "Toplevel worker should be connected and defined"
110139 height = self ._get_current_block_height (self .toplevel_worker )
111140
112141 if self .latest_tip_block_height < height :
@@ -128,6 +157,7 @@ def run(self) -> None:
128157 f"No new blocks to process, indexed/latest: { self .latest_indexed_block_height } /{ height } sleeping for { self .instance_config .INDEXER_POLL_INTERVAL } seconds"
129158 )
130159 self .update_tip_state_idle ()
160+ self .kill_workers ()
131161 time .sleep (self .instance_config .INDEXER_POLL_INTERVAL )
132162
133163 # Base methods for interacting with node directly
0 commit comments