Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions conda_libmamba_solver/shards_subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
# Waiting for worker threads to shutdown cleanly, or raise error.
THREAD_WAIT_TIMEOUT = 5 # seconds
REACHABLE_PIPELINED_MAX_TIMEOUTS = 10 # number of times we can timeout waiting for shards
QUEUE_TIMEOUT = 1


@dataclass(order=True)
Expand Down Expand Up @@ -344,7 +345,7 @@ def _reachable_pipelined(

network_thread = threading.Thread(
target=network_worker,
args=(cache_miss_queue, shard_out_queue, cache, self.shardlikes),
args=(cache_miss_queue, shard_out_queue, QueueCache(cache_in_queue), self.shardlikes),
daemon=True,
)

Expand Down Expand Up @@ -428,7 +429,7 @@ def log_timeout():
break

try:
new_shards = shard_out_queue.get(timeout=1)
new_shards = shard_out_queue.get(timeout=QUEUE_TIMEOUT)
if isinstance(new_shards, BaseException): # error propagated from worker thread
raise new_shards

Expand Down Expand Up @@ -539,7 +540,7 @@ def combine_batches_until_none(
while running:
try:
# Add timeout to prevent indefinite blocking if producer thread fails
batch = in_queue.get(timeout=5)
batch = in_queue.get(timeout=QUEUE_TIMEOUT)
if batch is None:
break
except queue.Empty:
Expand Down Expand Up @@ -575,6 +576,23 @@ def wrapper(in_queue, out_queue, *args, **kwargs):
return wrapper


class QueueCache:
def __init__(self, queue):
self.queue: Queue = queue

def insert(self, shard: AnnotatedRawShard):
Comment thread
jezdez marked this conversation as resolved.
self.queue.put([shard])

def copy(self):
return self # used for threadsafety in ShardCache; not needed here

def __enter__(self):
return self

def __exit__(self, *_):
return


@exception_to_queue
def cache_fetch_thread(
in_queue: Queue[Sequence[NodeId] | None],
Expand All @@ -596,7 +614,14 @@ def cache_fetch_thread(
cache: used to retrieve shards.
"""
with cache.copy() as cache:
for node_ids in combine_batches_until_none(in_queue):
for batch in combine_batches_until_none(in_queue):
node_ids = []
for item in batch:
if isinstance(item, AnnotatedRawShard):
# opens transaction; could do insertmany here or transaction scoped to loop
cache.insert(item)
else:
node_ids.append(item)
cached = cache.retrieve_multiple([node_id.shard_url for node_id in node_ids])

# should we add this into retrieve_multiple?
Expand Down
10 changes: 5 additions & 5 deletions news/924-fix-shards-db-locked
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

### Bug fixes

* Open the sharded repodata cache (`repodata_shards.db`) in WAL mode with a
longer SQLite busy timeout, so the pipelined cache reader thread no longer
races with the network writer thread and raises
`sqlite3.OperationalError: database is locked`. Fall back to the default
journal mode on filesystems where WAL is not supported. (#924)
* Serialize repodata cache access in a single thread. Open the sharded
repodata cache (`repodata_shards.db`) in WAL mode with a longer SQLite busy
timeout. This avoids an issue where the cache reader thread could
race with the network writer thread and raise `sqlite3.OperationalError: database is locked`.
Fall back to the default journal mode if WAL is not supported. (#924)

### Deprecations

Expand Down
15 changes: 8 additions & 7 deletions tests/test_shards_subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
fetch_shards_index,
)
from conda_libmamba_solver.shards_subset import (
QUEUE_TIMEOUT,
NodeId,
RepodataSubset,
build_repodata_subset,
Expand Down Expand Up @@ -309,7 +310,7 @@ def test_shards_cache_thread(
cache_thread.start()

# combined into a single output batch
batch = shard_out_queue.get(timeout=1)
batch = shard_out_queue.get(timeout=QUEUE_TIMEOUT)
for node_id, shard in batch:
assert node_id in fake_nodes
assert shard == cache.retrieve(node_id.shard_url)
Expand All @@ -318,7 +319,7 @@ def test_shards_cache_thread(
with pytest.raises(queue.Empty):
shard_out_queue.get_nowait()

while notfound := network_out_queue.get(timeout=1):
while notfound := network_out_queue.get(timeout=QUEUE_TIMEOUT):
for node_id in notfound:
assert node_id.shard_url.startswith("https://example.com/notfound")

Expand Down Expand Up @@ -362,7 +363,7 @@ def test_shards_network_thread(http_server_shards, shard_cache_with_data):
network_thread.start()

with suppress(Empty):
while batch := shard_out_queue.get(timeout=1):
while batch := shard_out_queue.get(timeout=QUEUE_TIMEOUT):
for url, shard in batch:
assert isinstance(shard, dict)

Expand All @@ -375,7 +376,7 @@ def test_shards_network_thread(http_server_shards, shard_cache_with_data):
# shardlikes has its url. (If no shardlike has NodeId's url, it produces
# KeyError).
network_in_queue.put([NodeId("nope", invalid_shardlike.url)])
assert isinstance(shard_out_queue.get(timeout=1), TypeError)
assert isinstance(shard_out_queue.get(timeout=QUEUE_TIMEOUT), TypeError)

# Terminate with sentinel
network_in_queue.put(None)
Expand Down Expand Up @@ -881,16 +882,16 @@ def failing_worker(in_q, out_q):
worker.start()

# Should get the successful result first
result = out_queue.get(timeout=1)
result = out_queue.get(timeout=QUEUE_TIMEOUT)
assert result == "processed: test_item"

# Should get the exception propagated
exception = out_queue.get(timeout=1)
exception = out_queue.get(timeout=QUEUE_TIMEOUT)
assert isinstance(exception, ValueError)
assert "Simulated worker failure" in str(exception)

# Worker should also send None to in_queue to signal termination
sentinel = in_queue.get(timeout=1)
sentinel = in_queue.get(timeout=QUEUE_TIMEOUT)
assert sentinel is None


Expand Down
Loading