Skip to content

Commit ab7b10f

Browse files
committed
add test for cache db entries deleter
1 parent 96a7671 commit ab7b10f

4 files changed

Lines changed: 304 additions & 74 deletions

File tree

src/ota_proxy/cache_index.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def __init__(self, db_f: Path, orm_type: type[ORMBase] = CacheMetaORM) -> None:
7979

8080
_delete_con = sqlite3.connect(self._db_f, check_same_thread=False)
8181
enable_wal_mode(_delete_con)
82-
self._delete_orm = orm_type(_write_con)
82+
self._delete_orm = orm_type(_delete_con)
8383

8484
self._closed = False
8585

@@ -134,9 +134,6 @@ def start_delete_thread(self) -> None:
134134
batch: list[str] = []
135135
loops_since_flush = 0
136136

137-
_con = sqlite3.connect(self._db_f, check_same_thread=False)
138-
enable_wal_mode(_con)
139-
140137
while not self._closed:
141138
time.sleep(cfg.DB_WRITER_LOOP_INTERVAL)
142139

@@ -164,7 +161,7 @@ def start_delete_thread(self) -> None:
164161
for _batch in batched(batch, cfg.DB_FLUSH_BATCH_SIZE):
165162
if _batch:
166163
self._flush_deletes(_batch)
167-
_con.close()
164+
self._delete_orm.orm_con.close()
168165

169166
def _flush_writes(self, _batch: Iterable[CacheMeta]) -> None:
170167
"""Write a batch of entries to SQLite in one transaction."""
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
# Copyright 2022 TIER IV, INC. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Integration tests for CacheDBWriter threads and CacheIndex DB round-trip."""
15+
16+
from __future__ import annotations
17+
18+
import bisect
19+
import random
20+
import sqlite3
21+
import threading
22+
import time
23+
from collections.abc import Callable, Generator
24+
from pathlib import Path
25+
26+
import pytest
27+
from pytest_mock import MockerFixture
28+
29+
from ota_proxy.cache_index import CacheDBWriter, CacheIndex
30+
from ota_proxy.config import config as cfg
31+
from ota_proxy.db import CacheMeta, CacheMetaORM, init_db
32+
from ota_proxy.utils import url_based_hash
33+
34+
BUCKET_SIZE_LIST = list(cfg.BUCKET_FILE_SIZE_DICT)
35+
BUCKET_SIZES = [0, 1024, 4096, 256 * 1024, 1024**2, 32 * 1024**2]
36+
DB_ENTRIES = 2_000
37+
ENTRIES_TO_REMOVE = 100
38+
39+
40+
# ---- helpers ---- #
41+
42+
43+
def _db_select_all_sha256_hashes(db_f: Path, table_name: str) -> set[str]:
44+
with sqlite3.connect(db_f) as con:
45+
orm = CacheMetaORM(con, table_name)
46+
return {
47+
row.file_sha256
48+
for row in orm.orm_select_entries(_stmt=f"SELECT * FROM {table_name}")
49+
}
50+
51+
52+
def _db_select_all(db_f: Path, table_name: str) -> dict[str, CacheMeta]:
53+
with sqlite3.connect(db_f) as con:
54+
orm = CacheMetaORM(con, table_name)
55+
return {
56+
row.file_sha256: row
57+
for row in orm.orm_select_entries(_stmt=f"SELECT * FROM {table_name}")
58+
}
59+
60+
61+
def _expected_bucket_idx(cache_size: int) -> int:
62+
return bisect.bisect_right(BUCKET_SIZE_LIST, cache_size) - 1
63+
64+
65+
def _run_thread(
66+
target: Callable[[], None], *, close_first: bool, writer: CacheDBWriter
67+
) -> None:
68+
"""Start a CacheDBWriter thread, optionally closing the writer first."""
69+
if close_first:
70+
writer.close()
71+
t = threading.Thread(target=target, daemon=True)
72+
t.start()
73+
if not close_first:
74+
time.sleep(0.3)
75+
writer.close()
76+
t.join(timeout=5)
77+
78+
79+
# ---- fixtures ---- #
80+
81+
82+
@pytest.fixture(scope="module")
83+
def entries() -> list[CacheMeta]:
84+
"""Pre-built list of DB_ENTRIES CacheMeta objects with varied cache_size."""
85+
return [
86+
CacheMeta(
87+
file_sha256=url_based_hash(f"http://example.com/f{i}"),
88+
url=f"http://example.com/f{i}",
89+
cache_size=BUCKET_SIZES[i % len(BUCKET_SIZES)],
90+
last_access=int(time.time()),
91+
)
92+
for i in range(DB_ENTRIES)
93+
]
94+
95+
96+
@pytest.fixture(autouse=True)
97+
def fast_db_cfg(mocker: MockerFixture) -> None:
98+
"""Speed up DB writer loops for all tests in this module."""
99+
mocker.patch.object(cfg, "DB_WRITER_LOOP_INTERVAL", 0.05)
100+
mocker.patch.object(cfg, "DB_FLUSH_MAX_LOOPS", 2)
101+
102+
103+
@pytest.fixture
104+
def db_f(tmp_path: Path) -> Path:
105+
"""Initialize an empty cache DB and return the path."""
106+
_db_f = tmp_path / "cache.db"
107+
init_db(_db_f, cfg.TABLE_NAME)
108+
return _db_f
109+
110+
111+
# ---- CacheDBWriter thread tests ---- #
112+
113+
114+
class TestCacheDBWriterDeleteThread:
115+
"""Delete thread drains the queue and flushes deletes to DB."""
116+
117+
@pytest.fixture
118+
def db_writer(
119+
self, db_f: Path, entries: list[CacheMeta]
120+
) -> Generator[tuple[CacheDBWriter, Path, list[CacheMeta]]]:
121+
with sqlite3.connect(db_f) as con:
122+
orm = CacheMetaORM(con)
123+
orm.orm_insert_entries(entries, or_option="replace")
124+
125+
writer = CacheDBWriter(db_f)
126+
yield writer, db_f, entries
127+
128+
@pytest.mark.parametrize(
129+
"close_first", [False, True], ids=["during_loop", "on_close"]
130+
)
131+
def test_delete_thread_flushes_queue(
132+
self,
133+
db_writer: tuple[CacheDBWriter, Path, list[CacheMeta]],
134+
close_first: bool,
135+
) -> None:
136+
"""Enqueued deletes are flushed both during the loop and on shutdown."""
137+
writer, db_f, entries = db_writer
138+
139+
to_delete = random.sample(entries, ENTRIES_TO_REMOVE)
140+
to_delete_keys = {e.file_sha256 for e in to_delete}
141+
for e in to_delete:
142+
writer.remove_entry(e.file_sha256)
143+
144+
_run_thread(writer.start_delete_thread, close_first=close_first, writer=writer)
145+
146+
remaining = _db_select_all_sha256_hashes(db_f, cfg.TABLE_NAME)
147+
for key in to_delete_keys:
148+
assert key not in remaining
149+
assert len(remaining) == DB_ENTRIES - ENTRIES_TO_REMOVE
150+
151+
152+
class TestCacheDBWriterWriteThread:
153+
"""Write thread drains the queue and flushes writes to DB."""
154+
155+
@pytest.fixture
156+
def db_writer(
157+
self, db_f: Path
158+
) -> Generator[tuple[CacheDBWriter, Path], None, None]:
159+
writer = CacheDBWriter(db_f)
160+
yield writer, db_f
161+
162+
@pytest.mark.parametrize(
163+
"close_first", [False, True], ids=["during_loop", "on_close"]
164+
)
165+
def test_write_thread_flushes_queue(
166+
self,
167+
db_writer: tuple[CacheDBWriter, Path],
168+
entries: list[CacheMeta],
169+
close_first: bool,
170+
) -> None:
171+
"""Enqueued writes are flushed both during the loop and on shutdown."""
172+
writer, db_f = db_writer
173+
174+
for e in entries:
175+
writer.register_entry(e)
176+
177+
_run_thread(writer.start_write_thread, close_first=close_first, writer=writer)
178+
179+
persisted = _db_select_all_sha256_hashes(db_f, cfg.TABLE_NAME)
180+
assert len(persisted) == DB_ENTRIES
181+
for e in entries:
182+
assert e.file_sha256 in persisted
183+
184+
def test_write_thread_assigns_bucket_idx(
185+
self,
186+
db_writer: tuple[CacheDBWriter, Path],
187+
entries: list[CacheMeta],
188+
) -> None:
189+
"""Write thread must assign correct bucket_idx for backward compat."""
190+
writer, db_f = db_writer
191+
192+
for e in entries:
193+
writer.register_entry(
194+
CacheMeta(
195+
file_sha256=e.file_sha256,
196+
url=e.url,
197+
last_access=e.last_access,
198+
cache_size=e.cache_size,
199+
content_encoding=e.content_encoding,
200+
file_compression_alg=e.file_compression_alg,
201+
# NOTE: intentionally NOT include the bucket_id,
202+
# test the writer thread set the bucket_id correctly.
203+
)
204+
)
205+
206+
_run_thread(writer.start_write_thread, close_first=False, writer=writer)
207+
208+
rows = _db_select_all(db_f, cfg.TABLE_NAME)
209+
for e in entries:
210+
assert rows[e.file_sha256].bucket_idx == _expected_bucket_idx(e.cache_size)
211+
212+
213+
# ---- CacheIndex full round-trip ---- #
214+
215+
216+
class TestCacheIndexCommitAndRemove:
217+
"""CacheIndex commit_entry / remove_entry with real DB threads."""
218+
219+
@pytest.fixture
220+
def cache_index(
221+
self, db_f: Path, tmp_path: Path
222+
) -> Generator[tuple[CacheIndex, Path, Path], None, None]:
223+
base_dir = tmp_path / "ota-cache"
224+
base_dir.mkdir()
225+
idx = CacheIndex(db_f, base_dir, init_db=True)
226+
try:
227+
yield idx, db_f, base_dir
228+
finally:
229+
idx.close()
230+
231+
def test_committed_entries_persisted_to_db(
232+
self,
233+
cache_index: tuple[CacheIndex, Path, Path],
234+
entries: list[CacheMeta],
235+
) -> None:
236+
"""commit_entry must land in both in-memory index and DB."""
237+
idx, db_f, base_dir = cache_index
238+
239+
for e in entries:
240+
(base_dir / e.file_sha256).touch()
241+
idx.commit_entry(e)
242+
243+
# In-memory lookups work immediately
244+
for e in entries:
245+
assert idx.lookup_entry(e.file_sha256) is not None
246+
247+
# Flush DB and verify persistence + bucket_idx
248+
idx.close()
249+
rows = _db_select_all(db_f, cfg.TABLE_NAME)
250+
for e in entries:
251+
assert e.file_sha256 in rows
252+
assert rows[e.file_sha256].bucket_idx == _expected_bucket_idx(e.cache_size)
253+
254+
def test_remove_entry_clears_index_and_db(
255+
self, db_f: Path, tmp_path: Path, entries: list[CacheMeta]
256+
) -> None:
257+
"""remove_entry must drop from in-memory index and DB."""
258+
base_dir = tmp_path / "ota-cache-rm"
259+
base_dir.mkdir()
260+
261+
# Phase 1: commit entries and flush writes completely.
262+
idx = CacheIndex(db_f, base_dir, init_db=True)
263+
for e in entries:
264+
(base_dir / e.file_sha256).touch()
265+
idx.commit_entry(e)
266+
idx.close()
267+
268+
assert len(_db_select_all_sha256_hashes(db_f, cfg.TABLE_NAME)) == DB_ENTRIES
269+
270+
# Phase 2: reopen and remove a random subset.
271+
idx = CacheIndex(db_f, base_dir)
272+
to_remove = random.sample(entries, ENTRIES_TO_REMOVE)
273+
to_remove_keys = {e.file_sha256 for e in to_remove}
274+
for e in to_remove:
275+
idx.remove_entry(e.file_sha256)
276+
277+
# In-memory index reflects removals immediately
278+
for e in to_remove:
279+
assert idx.lookup_entry(e.file_sha256) is None
280+
for e in entries:
281+
if e.file_sha256 not in to_remove_keys:
282+
assert idx.lookup_entry(e.file_sha256) is not None
283+
284+
# Flush and verify DB
285+
idx.close()
286+
remaining = _db_select_all_sha256_hashes(db_f, cfg.TABLE_NAME)
287+
for key in to_remove_keys:
288+
assert key not in remaining
289+
assert len(remaining) == DB_ENTRIES - ENTRIES_TO_REMOVE

test/integration/test_otaproxy/test_server_app.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import asyncio
3333
import socket
3434
from http import HTTPStatus
35-
from unittest.mock import AsyncMock, MagicMock
3635

3736
import aiohttp
3837
import pytest
@@ -43,12 +42,12 @@
4342
from ota_proxy.server_app import App
4443

4544

46-
def _make_ota_cache_mock() -> MagicMock:
45+
def _make_ota_cache_mock(mocker):
4746
"""Return a MagicMock that satisfies the OTACache interface."""
48-
mock = MagicMock()
49-
mock.start = AsyncMock()
50-
mock.close = AsyncMock()
51-
mock.retrieve_file = AsyncMock()
47+
mock = mocker.MagicMock()
48+
mock.start = mocker.AsyncMock()
49+
mock.close = mocker.AsyncMock()
50+
mock.retrieve_file = mocker.AsyncMock()
5251
return mock
5352

5453

@@ -67,7 +66,7 @@ class TestUvicornCompatibility:
6766
"""
6867

6968
@pytest.fixture
70-
async def live_server(self):
69+
async def live_server(self, mocker):
7170
"""Start uvicorn in-process and yield (server, port, mock_cache, content).
7271
7372
loop="none" keeps uvicorn on the current event loop so that the fixture
@@ -76,7 +75,7 @@ async def live_server(self):
7675
__init__.py.
7776
"""
7877
content = b"test-payload"
79-
mock_cache = _make_ota_cache_mock()
78+
mock_cache = _make_ota_cache_mock(mocker)
8079
mock_cache.retrieve_file.return_value = (
8180
content,
8281
CIMultiDict({HEADER_CONTENT_TYPE: "application/octet-stream"}),
@@ -112,13 +111,13 @@ async def live_server(self):
112111
server.should_exit = True
113112
await serve_task
114113

115-
def test_production_config_params_are_accepted(self):
114+
def test_production_config_params_are_accepted(self, mocker):
116115
"""uvicorn.Config must not raise with our production parameter set.
117116
118117
This test validates that the uvicorn version installed in the project
119118
still accepts loop="uvloop", http="httptools", and lifespan="on".
120119
"""
121-
mock_cache = _make_ota_cache_mock()
120+
mock_cache = _make_ota_cache_mock(mocker)
122121
app = App(mock_cache)
123122
# Must not raise regardless of uvicorn version
124123
config = uvicorn.Config(

0 commit comments

Comments
 (0)