Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## v0.1.1 - 2026-06-03

### Fixed

- Improved reliability when Windows Search is using the app index, so both the CLI and desktop UI can recover from temporary database locks while loading apps, showing managed aliases, or generating aliases.
- Reduced the wait before recovery starts when the app index is locked.

## v0.1.0 - 2026-06-01

Initial version of `win-search-aliases`.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ powershell -ExecutionPolicy Bypass -c "iwr -useb https://raw.githubusercontent.c
### pip

```bash
python -m pip install https://github.com/mbv06/win-search-aliases/archive/refs/heads/main.zip
python -m pip install --upgrade https://github.com/mbv06/win-search-aliases/archive/refs/heads/main.zip
```

### Standalone EXE
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "win-search-aliases"
version = "0.1.0"
version = "0.1.1"
description = "Add managed app-search aliases to the internal Windows Search AppsIndex database (Windows 11 only)."
readme = "README.md"
requires-python = ">=3.11"
Expand Down
51 changes: 38 additions & 13 deletions src/win_search_aliases/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class ManagedRow(NamedTuple):
logger = logging.getLogger(__name__)
T = TypeVar("T")
WRITE_RETRY_ATTEMPTS = 3
READ_RETRY_ATTEMPTS = 3
WRITE_SQLITE_TIMEOUT_SECONDS = 2.0
READ_SQLITE_TIMEOUT_SECONDS = 1.0
TRANSIENT_SQLITE_MESSAGES = (
"database is locked",
"database table is locked",
Expand Down Expand Up @@ -104,14 +107,18 @@ def resolve_db_path(path: str | Path | None = None) -> Path:
return db_path


def connect(db_path: str | Path) -> sqlite3.Connection:
conn = sqlite3.connect(Path(db_path), timeout=10)
conn.execute("pragma busy_timeout = 10000")
def connect(db_path: str | Path, *, timeout_seconds: float = WRITE_SQLITE_TIMEOUT_SECONDS) -> sqlite3.Connection:
conn = sqlite3.connect(Path(db_path), timeout=timeout_seconds)
conn.execute(f"pragma busy_timeout = {int(timeout_seconds * 1000)}")
return conn


def read_tiles(db_path: str | Path) -> list[AppCandidate]:
with closing(connect(db_path)) as conn:
return _with_sqlite_retry("read", lambda: _read_tiles_once(db_path), attempts=READ_RETRY_ATTEMPTS)
Comment thread
mbv06 marked this conversation as resolved.


def _read_tiles_once(db_path: str | Path) -> list[AppCandidate]:
with closing(_connect_for_read(db_path)) as conn:
columns = {row[1] for row in conn.execute("pragma table_info(tiles)").fetchall()}
required = {"displayName", "appId"}
missing = required - columns
Expand All @@ -128,7 +135,9 @@ def read_tiles(db_path: str | Path) -> list[AppCandidate]:
left join tiles_content as tc on tc.id = t.rowid
"""
).fetchall()
except sqlite3.OperationalError:
except sqlite3.OperationalError as exc:
if _is_transient_sqlite_error(exc):
raise
rows = conn.execute(base_query).fetchall()
else:
rows = conn.execute(base_query).fetchall()
Expand Down Expand Up @@ -205,9 +214,16 @@ def write(conn: sqlite3.Connection) -> AliasWriteResult:
def managed_rows(
db_path: str | Path,
sources: set[str] | None = None,
) -> list[ManagedRow]:
return _with_sqlite_retry("read", lambda: _managed_rows_once(db_path, sources), attempts=READ_RETRY_ATTEMPTS)


def _managed_rows_once(
db_path: str | Path,
sources: set[str] | None = None,
) -> list[ManagedRow]:
where_sql, params = _managed_source_filter(sources)
with closing(connect(db_path)) as conn:
with closing(_connect_for_read(db_path)) as conn:
rows = conn.execute(
f"""
select displayName, synonym, rankPenalty, source
Expand Down Expand Up @@ -321,8 +337,12 @@ def _begin_write(conn: sqlite3.Connection) -> None:
conn.execute("begin immediate")


def _connect_for_read(db_path: str | Path) -> sqlite3.Connection:
return connect(db_path, timeout_seconds=READ_SQLITE_TIMEOUT_SECONDS)


def _write_transaction(db_path: str | Path, write: Callable[[sqlite3.Connection], T]) -> T:
return _with_write_retry(lambda: _write_transaction_once(db_path, write))
return _with_sqlite_retry("write", lambda: _write_transaction_once(db_path, write), attempts=WRITE_RETRY_ATTEMPTS)


def _write_transaction_once(db_path: str | Path, write: Callable[[sqlite3.Connection], T]) -> T:
Expand All @@ -337,17 +357,22 @@ def _write_transaction_once(db_path: str | Path, write: Callable[[sqlite3.Connec
return result


def _with_write_retry(write: Callable[[], T]) -> T:
for attempt in range(WRITE_RETRY_ATTEMPTS):
def _with_sqlite_retry(operation: str, action: Callable[[], T], *, attempts: int) -> T:
for attempt in range(attempts):
logger.debug("SQLite %s attempt %s/%s", operation, attempt + 1, attempts)
try:
return write()
return action()
except sqlite3.OperationalError as exc:
if attempt == WRITE_RETRY_ATTEMPTS - 1 or not _is_transient_sqlite_error(exc):
if attempt == attempts - 1 or not _is_transient_sqlite_error(exc):
raise
logger.warning("SQLite write failed with a transient error; retrying after stopping SearchHost: %s", exc)
logger.warning(
"SQLite %s failed with a transient error; retrying after stopping SearchHost: %s",
operation,
exc,
)
stop_search_host()
time.sleep(0.25 * (attempt + 1))
raise RuntimeError("unreachable SQLite write retry state")
raise RuntimeError(f"unreachable SQLite {operation} retry state")


def _rollback_quietly(conn: sqlite3.Connection) -> None:
Expand Down
151 changes: 150 additions & 1 deletion tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from win_search_aliases.aliases import AliasRecord
from win_search_aliases.config import load_deny_list
from win_search_aliases.db import (
READ_SQLITE_TIMEOUT_SECONDS,
SOURCE_CUSTOM,
SOURCE_GENERATED_AUTO,
SOURCE_GENERATED_MANUAL,
connect,
create_backup,
insert_alias_records,
managed_rows,
Expand All @@ -35,6 +37,118 @@ def test_read_tiles_and_counts_report_total_and_eligible(tmp_path) -> None:
assert candidates[0].content_c1.endswith("Google Chrome.lnk")


def test_connect_applies_requested_busy_timeout(monkeypatch) -> None:
seen = {}

class FakeConnection:
def execute(self, sql):
seen["pragma"] = sql

def fake_sqlite_connect(path, *, timeout):
seen["path"] = path
seen["timeout"] = timeout
return FakeConnection()

monkeypatch.setattr(db_mod.sqlite3, "connect", fake_sqlite_connect)

conn = connect("AppsIndex.db", timeout_seconds=READ_SQLITE_TIMEOUT_SECONDS)

assert conn is not None
assert seen["timeout"] == READ_SQLITE_TIMEOUT_SECONDS
assert seen["pragma"] == "pragma busy_timeout = 1000"


def test_read_tiles_retries_transient_locked_database(monkeypatch) -> None:
attempts = []
stopped = []

class FakeCursor:
def __init__(self, rows):
self._rows = rows

def fetchall(self):
return self._rows

class FakeConnection:
def __init__(self, should_fail: bool) -> None:
self.should_fail = should_fail
self.closed = False

def execute(self, sql, *_args, **_kwargs):
if sql == "pragma table_info(tiles)" and self.should_fail:
raise sqlite3.OperationalError("database is locked")
if sql == "pragma table_info(tiles)":
return FakeCursor([(0, "displayName"), (1, "appId"), (2, "cRank")])
if "from sqlite_master" in sql:
return FakeCursor([])
return FakeCursor([("Google Chrome", "chrome", 1)])

def close(self) -> None:
self.closed = True

def fake_connect(_path, **_kwargs):
attempts.append(True)
return FakeConnection(should_fail=len(attempts) == 1)

monkeypatch.setattr(db_mod, "connect", fake_connect)
monkeypatch.setattr(db_mod, "stop_search_host", lambda: stopped.append(True))
monkeypatch.setattr(db_mod.time, "sleep", lambda _seconds: None)

tiles = read_tiles("AppsIndex.db")

assert [(tile.display_name, tile.app_id) for tile in tiles] == [("Google Chrome", "chrome")]
assert len(attempts) == 2
assert stopped == [True]


def test_read_tiles_retries_locked_tiles_content_join(monkeypatch) -> None:
attempts = []
stopped = []

class FakeCursor:
def __init__(self, rows):
self._rows = rows

def fetchall(self):
return self._rows

class FakeConnection:
def __init__(self, should_fail_join: bool) -> None:
self.should_fail_join = should_fail_join

def execute(self, sql, *_args, **_kwargs):
if sql == "pragma table_info(tiles)":
return FakeCursor([(0, "displayName"), (1, "appId"), (2, "cRank")])
if "from sqlite_master" in sql:
return FakeCursor([("tiles_content",)])
if sql == "pragma table_info(tiles_content)":
return FakeCursor([(0, "id"), (1, "c1")])
if "left join tiles_content" in sql:
if self.should_fail_join:
raise sqlite3.OperationalError("database is locked")
return FakeCursor([("Google Chrome", "chrome", 1, "Programs/Google Chrome.lnk")])
return FakeCursor([("Google Chrome", "chrome", 1)])

def close(self) -> None:
pass

def fake_connect(_path, **_kwargs):
attempts.append(True)
return FakeConnection(should_fail_join=len(attempts) == 1)

monkeypatch.setattr(db_mod, "connect", fake_connect)
monkeypatch.setattr(db_mod, "stop_search_host", lambda: stopped.append(True))
monkeypatch.setattr(db_mod.time, "sleep", lambda _seconds: None)

tiles = read_tiles("AppsIndex.db")

assert [(tile.display_name, tile.content_c1) for tile in tiles] == [
("Google Chrome", "Programs/Google Chrome.lnk")
]
assert len(attempts) == 2
assert stopped == [True]


def test_insert_alias_records_is_idempotent(tmp_path) -> None:
db_path = tmp_path / "AppsIndex.db"
create_fixture_db(db_path)
Expand Down Expand Up @@ -126,12 +240,47 @@ def execute(self, *_args, **_kwargs):
def close(self):
closed.append(True)

monkeypatch.setattr("win_search_aliases.db.connect", lambda _path: FakeConnection())
monkeypatch.setattr("win_search_aliases.db.connect", lambda _path, **_kwargs: FakeConnection())

assert managed_rows("AppsIndex.db") == []
assert closed == [True]


def test_managed_rows_retries_transient_locked_database(monkeypatch) -> None:
attempts = []
stopped = []

class FakeCursor:
def fetchall(self):
return [("Google Chrome", "browser", 1, SOURCE_CUSTOM)]

class FakeConnection:
def __init__(self, should_fail: bool) -> None:
self.should_fail = should_fail

def execute(self, *_args, **_kwargs):
if self.should_fail:
raise sqlite3.OperationalError("database is locked")
return FakeCursor()

def close(self) -> None:
pass

def fake_connect(_path, **_kwargs):
attempts.append(True)
return FakeConnection(should_fail=len(attempts) == 1)

monkeypatch.setattr(db_mod, "connect", fake_connect)
monkeypatch.setattr(db_mod, "stop_search_host", lambda: stopped.append(True))
monkeypatch.setattr(db_mod.time, "sleep", lambda _seconds: None)

rows = managed_rows("AppsIndex.db")

assert rows == [("Google Chrome", "browser", 1, SOURCE_CUSTOM)]
assert len(attempts) == 2
assert stopped == [True]


def test_replace_auto_source_leaves_manual_and_custom_rows(tmp_path) -> None:
db_path = tmp_path / "AppsIndex.db"
create_fixture_db(db_path)
Expand Down
Loading