Skip to content

Commit 6795010

Browse files
committed
perf(index): batch full-vault embeddings by chunk length
1 parent e2c9bb4 commit 6795010

3 files changed

Lines changed: 319 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717
- The blind-test runner now records first-stage BM25, vector, title, metadata, indegree, and fused-RRF diagnostics in config A result JSON so retrieval misses can be classified without ad hoc scripts.
1818
- The blind-test runner can now enable an off-by-default local metadata candidate-injection experiment, letting title/alias fallback add source and one-hop neighbor candidates before the single rerank pass.
1919

20+
### Changed
21+
- Full-vault indexing now embeds chunks in length-sorted batches instead of one file at a time, improving first-run indexing throughput on real Markdown vaults while preserving single-file indexing behavior and the existing SQLite schema.
22+
2023
### Fixed
2124
- `seeklink search --rerank-k N` now limits the number of candidates passed to the cross-encoder even when `N` is lower than `--top-k`; the remaining results keep first-stage RRF order.
2225
- `seeklink search` and `seeklink index` now auto-restart a stale daemon when its vault, embedder, or reranker config no longer matches the caller, avoiding repeated cold-start fallbacks after switching vaults or model settings.

seeklink/ingest.py

Lines changed: 262 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
import json
77
import logging
88
import re
9+
from dataclasses import dataclass
910
from datetime import UTC, datetime
1011
from pathlib import Path
1112
from uuid import uuid4
1213

13-
from seeklink.chunker import chunk_markdown
14+
from seeklink.chunker import ChunkSpan, chunk_markdown
1415
from seeklink.db import Database
1516
from seeklink.embedder import Embedder
1617
from seeklink.link_parser import extract_wiki_links
@@ -20,6 +21,7 @@
2021

2122
# Non-hidden top-level dirs excluded from indexing (mirrors freshness._SKIP_DIRS)
2223
_SKIP_DIRS = {"todo", "archive"}
24+
_EMBED_BATCH_SIZE = 32
2325

2426
# Regex for YAML frontmatter block (handles empty frontmatter too).
2527
# Public — search.py imports this to map body-relative chunk offsets back
@@ -28,6 +30,28 @@
2830
_FRONTMATTER_RE = FRONTMATTER_RE # backward-compat alias within this module
2931

3032

33+
@dataclass(slots=True)
34+
class _PreparedFile:
35+
path: Path
36+
rel_path: str
37+
content_hash: str
38+
existing: Source | None
39+
title: str
40+
chunks: list[ChunkSpan]
41+
targets: list[str]
42+
tags: list[str]
43+
aliases: list[str]
44+
aliases_json: str
45+
unchanged: bool = False
46+
47+
48+
@dataclass(frozen=True, slots=True)
49+
class _ChunkEmbeddingItem:
50+
file_index: int
51+
chunk_index: int
52+
text: str
53+
54+
3155
def _utcnow() -> str:
3256
"""Return current UTC timestamp in SQLite-compatible format."""
3357
return datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")
@@ -169,6 +193,7 @@ def ingest_vault(
169193

170194
stats = {"ingested": 0, "unchanged": 0, "skipped": 0, "errors": 0, "pruned": 0}
171195
seen_paths: set[str] = set()
196+
prepared_files: list[_PreparedFile] = []
172197

173198
for md_path in sorted(vault_root.rglob("*.md")):
174199
try:
@@ -179,21 +204,51 @@ def ingest_vault(
179204
continue
180205
rel_path = str(rel)
181206
seen_paths.add(rel_path)
182-
existing = db.get_source_by_path(rel_path)
183207

184208
try:
185-
result = ingest_file(db, md_path, vault_root, embedder)
209+
prepared = _prepare_file(db, md_path, vault_root)
186210
except Exception:
187-
logger.exception("Error ingesting %s", md_path)
211+
logger.exception("Error preparing %s", md_path)
188212
stats["errors"] += 1
189213
continue
190214

191-
if result is None:
215+
if prepared is None:
192216
stats["skipped"] += 1
193-
elif existing is not None and existing.content_hash == result.content_hash and existing.status == "indexed":
217+
elif prepared.unchanged:
194218
stats["unchanged"] += 1
195219
else:
196-
stats["ingested"] += 1
220+
prepared_files.append(prepared)
221+
222+
embeddings_by_file, embed_errors = _embed_prepared_files(
223+
prepared_files,
224+
embedder,
225+
batch_size=_EMBED_BATCH_SIZE,
226+
)
227+
228+
for i, prepared in enumerate(prepared_files):
229+
if i in embed_errors:
230+
error = embed_errors[i]
231+
logger.error(
232+
"Error embedding %s: %s",
233+
prepared.path,
234+
error,
235+
exc_info=(type(error), error, error.__traceback__),
236+
)
237+
stats["errors"] += 1
238+
continue
239+
240+
try:
241+
_write_prepared_file(
242+
db,
243+
prepared,
244+
embeddings_by_file.get(i, []),
245+
)
246+
except Exception:
247+
logger.exception("Error ingesting %s", prepared.path)
248+
stats["errors"] += 1
249+
continue
250+
251+
stats["ingested"] += 1
197252

198253
# Prune DB entries for files that no longer exist on disk
199254
for src in db.list_sources():
@@ -205,6 +260,206 @@ def ingest_vault(
205260
return stats
206261

207262

263+
def _prepare_file(
264+
db: Database,
265+
path: Path,
266+
vault_root: Path,
267+
) -> _PreparedFile | None:
268+
"""Read and parse a markdown file before batch embedding."""
269+
if path.suffix.lower() != ".md":
270+
return None
271+
272+
try:
273+
content = path.read_text(encoding="utf-8")
274+
except (UnicodeDecodeError, OSError) as e:
275+
logger.warning("Skipping %s: %s", path, e)
276+
return None
277+
278+
content_hash = hashlib.sha256(content.encode("utf-8")).hexdigest()
279+
rel_path = str(path.relative_to(vault_root))
280+
281+
existing = db.get_source_by_path(rel_path)
282+
if (
283+
existing is not None
284+
and existing.content_hash == content_hash
285+
and existing.status == "indexed"
286+
):
287+
db.update_source(existing.id, indexed_at=_utcnow())
288+
return _PreparedFile(
289+
path=path,
290+
rel_path=rel_path,
291+
content_hash=content_hash,
292+
existing=existing,
293+
title=existing.title or path.stem,
294+
chunks=[],
295+
targets=[],
296+
tags=[],
297+
aliases=[],
298+
aliases_json=existing.aliases,
299+
unchanged=True,
300+
)
301+
302+
tags, aliases, body = _parse_frontmatter(content)
303+
title = _extract_title(body, path)
304+
chunks = chunk_markdown(body)
305+
targets = extract_wiki_links(body)
306+
aliases_json = json.dumps(aliases, ensure_ascii=False)
307+
308+
return _PreparedFile(
309+
path=path,
310+
rel_path=rel_path,
311+
content_hash=content_hash,
312+
existing=existing,
313+
title=title,
314+
chunks=chunks,
315+
targets=targets,
316+
tags=tags,
317+
aliases=aliases,
318+
aliases_json=aliases_json,
319+
)
320+
321+
322+
def _embed_prepared_files(
323+
prepared_files: list[_PreparedFile],
324+
embedder: Embedder,
325+
*,
326+
batch_size: int,
327+
) -> tuple[dict[int, list[bytes]], dict[int, Exception]]:
328+
"""Embed all prepared chunks in length-sorted batches.
329+
330+
Sorting by text length avoids mixing very short and very long chunks in the
331+
same ONNX batch, which otherwise wastes time on padding. Results are mapped
332+
back to the original file/chunk positions before writing.
333+
"""
334+
items: list[_ChunkEmbeddingItem] = []
335+
for file_index, prepared in enumerate(prepared_files):
336+
for chunk_index, chunk in enumerate(prepared.chunks):
337+
items.append(
338+
_ChunkEmbeddingItem(
339+
file_index=file_index,
340+
chunk_index=chunk_index,
341+
text=chunk.text,
342+
)
343+
)
344+
345+
items.sort(key=lambda item: len(item.text))
346+
embeddings_by_file: dict[int, list[bytes | None]] = {
347+
file_index: [None] * len(prepared.chunks)
348+
for file_index, prepared in enumerate(prepared_files)
349+
}
350+
errors: dict[int, Exception] = {}
351+
352+
def embed_items(batch: list[_ChunkEmbeddingItem]) -> None:
353+
if not batch:
354+
return
355+
try:
356+
embeddings = embedder.embed_documents([item.text for item in batch])
357+
if len(embeddings) != len(batch):
358+
raise RuntimeError(
359+
f"Embedder returned {len(embeddings)} embeddings "
360+
f"for {len(batch)} chunks"
361+
)
362+
except Exception as e:
363+
if len(batch) == 1:
364+
errors.setdefault(batch[0].file_index, e)
365+
return
366+
mid = len(batch) // 2
367+
embed_items(batch[:mid])
368+
embed_items(batch[mid:])
369+
return
370+
371+
for item, embedding in zip(batch, embeddings):
372+
embeddings_by_file[item.file_index][item.chunk_index] = embedding
373+
374+
for start in range(0, len(items), batch_size):
375+
embed_items(items[start : start + batch_size])
376+
377+
out: dict[int, list[bytes]] = {}
378+
for file_index, embeddings in embeddings_by_file.items():
379+
if file_index in errors:
380+
continue
381+
if any(embedding is None for embedding in embeddings):
382+
errors.setdefault(
383+
file_index,
384+
RuntimeError(
385+
f"missing embeddings for {prepared_files[file_index].rel_path}"
386+
),
387+
)
388+
continue
389+
out[file_index] = [
390+
embedding for embedding in embeddings if embedding is not None
391+
]
392+
393+
return out, errors
394+
395+
396+
def _write_prepared_file(
397+
db: Database,
398+
prepared: _PreparedFile,
399+
embeddings: list[bytes],
400+
) -> Source | None:
401+
"""Persist one prepared file after embeddings have been computed."""
402+
if len(embeddings) != len(prepared.chunks):
403+
raise RuntimeError(
404+
f"Embedding count mismatch for {prepared.rel_path}: "
405+
f"{len(embeddings)} embeddings for {len(prepared.chunks)} chunks"
406+
)
407+
408+
with db.transaction():
409+
if prepared.existing is not None:
410+
db.delete_chunks_by_source(prepared.existing.id)
411+
db.delete_links_by_source(prepared.existing.id)
412+
db.delete_tags_by_source(prepared.existing.id)
413+
source = prepared.existing
414+
else:
415+
source = db.add_source(
416+
uid=str(uuid4()),
417+
path=prepared.rel_path,
418+
content_hash=prepared.content_hash,
419+
)
420+
421+
for i, (chunk_span, emb) in enumerate(zip(prepared.chunks, embeddings)):
422+
db_chunk = db.add_chunk(
423+
source_id=source.id,
424+
content=chunk_span.text,
425+
chunk_index=i,
426+
char_start=chunk_span.char_start,
427+
char_end=chunk_span.char_end,
428+
token_count=chunk_span.token_count,
429+
)
430+
db.upsert_vec(db_chunk.id, emb)
431+
432+
for target in prepared.targets:
433+
target_source = _find_source_by_target(db, target)
434+
db.add_wiki_link(
435+
source_note_id=source.id,
436+
target_path=target,
437+
target_note_id=target_source.id if target_source else None,
438+
)
439+
440+
if prepared.tags:
441+
db.add_tags(source.id, prepared.tags)
442+
443+
stem = prepared.path.stem
444+
rel_no_ext = prepared.rel_path.removesuffix(".md")
445+
db.resolve_forward_refs(stem, source.id)
446+
if rel_no_ext != stem:
447+
db.resolve_forward_refs(rel_no_ext, source.id)
448+
for alias in prepared.aliases:
449+
db.resolve_forward_refs(alias, source.id)
450+
451+
db.update_source(
452+
source.id,
453+
title=prepared.title,
454+
content_hash=prepared.content_hash,
455+
status="indexed",
456+
indexed_at=_utcnow(),
457+
aliases=prepared.aliases_json,
458+
)
459+
460+
return db.get_source(source.id)
461+
462+
208463
def _parse_frontmatter(content: str) -> tuple[list[str], list[str], str]:
209464
"""Parse YAML frontmatter, returning (tags, aliases, body).
210465

0 commit comments

Comments
 (0)