Skip to content

Commit 27713fc

Browse files
author
andraz maier
committed
fix: resolve #1 empty queue deadlock
1 parent 28e4274 commit 27713fc

File tree

2 files changed

+18
-12
lines changed

2 files changed

+18
-12
lines changed

utxo_indexer/indexer/bitcoin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def process_block(self, block_height: int):
3131

3232
processed_block = BlockProcessorMemory()
3333

34+
assert self.toplevel_worker is not None, "Toplevel worker is not set"
3435
block_hash = self._get_block_hash_from_height(block_height, self.toplevel_worker)
3536
res_block = self._get_block_by_hash(block_hash, self.toplevel_worker)
3637

utxo_indexer/indexer/doge.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
import threading
3-
from queue import Queue
3+
from queue import Empty, Queue
44
from typing import Callable
55

66
from django.db import transaction
@@ -24,12 +24,13 @@
2424

2525

2626
def thread_worker(session: Session, process_queue: Queue, processed_block: BlockProcessorMemory):
27-
while not process_queue.empty():
28-
item = process_queue.get()
29-
if callable(item):
27+
while True:
28+
try:
29+
item = process_queue.get_nowait()
3030
item(session, processed_block)
31-
else:
32-
raise Exception("Item in queue is not callable")
31+
process_queue.task_done()
32+
except Empty:
33+
return
3334

3435

3536
def process_pre_vout_transaction(
@@ -41,16 +42,16 @@ def process_pre_vout_transaction(
4142
"""Return the function that processes the transaction prevouts and link it to the spending transaction
4243
4344
Args:
44-
vin (IUtxoVinTransaction): vin object from spending transaction
45+
vin (VinResponse): vin object from spending transaction
4546
vin_n (int): index of vin in spending transaction
46-
tx_link (str): transaction id of spending transaction
47+
tx_link (UtxoTransaction): transaction object of spending transaction
48+
transaction_getter (Callable): function to retrieve transaction details
4749
"""
4850

4951
def _process_pre_vout_transaction(session: Session, processed_block: BlockProcessorMemory):
5052
txid, vout_n = vin.txid, vin.vout
5153
# Memo:
5254
vout = TransactionOutput.objects.filter(transaction_link__transaction_id=txid, n=vout_n).first()
53-
prevout_res = None
5455
if vout is not None:
5556
prevout_res = vout.to_vout_response()
5657
else:
@@ -79,8 +80,9 @@ def process_block(self, block_height: int):
7980
# NOTICE: we always assume that block processing is for blocks that are for sure on main branch of the blockchain
8081

8182
processed_block = BlockProcessorMemory()
82-
process_queue: Queue = Queue()
83+
process_queue: Queue[Callable[[Session, BlockProcessorMemory], TransactionInput]] = Queue()
8384

85+
assert self.toplevel_worker is not None, "Toplevel worker is not set"
8486
block_hash = self._get_block_hash_from_height(block_height, self.toplevel_worker)
8587
res_block = self._get_block_by_hash(block_hash, self.toplevel_worker)
8688

@@ -108,15 +110,18 @@ def process_block(self, block_height: int):
108110
for vout in tx.vout:
109111
processed_block.vouts.append(TransactionOutput.object_from_node_response(vout, tx_link))
110112

111-
# multithreading part of the processing
112-
workers = []
113+
# Multithreading part of the processing
114+
# Launch worker threads
115+
workers: list[threading.Thread] = []
113116
for worker_index in range(len(self.workers)):
114117
t = threading.Thread(
115118
target=thread_worker, args=(self.workers[worker_index], process_queue, processed_block)
116119
)
117120
workers.append(t)
118121
t.start()
119122

123+
# Wait for all tasks to be processed and for workers to finnish.
124+
process_queue.join()
120125
[t.join() for t in workers]
121126

122127
if not process_queue.empty():

0 commit comments

Comments
 (0)