Skip to content

Commit 56c0680

Browse files
committed
build index after pipe is done, remove importer indexer complexity
1 parent d9d0246 commit 56c0680

File tree

8 files changed

+151
-392
lines changed

8 files changed

+151
-392
lines changed

poliloom/poliloom/cli.py

Lines changed: 69 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from poliloom.database import get_engine
1515
from poliloom.logging import setup_logging
1616
from sqlalchemy.orm import Session
17-
from sqlalchemy import exists, func
17+
from sqlalchemy import exists, text
1818
from poliloom.models import (
1919
Country,
2020
CurrentImportEntity,
@@ -28,7 +28,6 @@
2828
Property,
2929
WikidataDump,
3030
WikidataEntity,
31-
WikidataEntityLabel,
3231
)
3332

3433
# Configure logging
@@ -983,8 +982,8 @@ def index_delete(confirm):
983982
def index_build(batch_size, rebuild):
984983
"""Build Meilisearch index from database.
985984
986-
Indexes all documents from Location, Country, Language, and Politician tables.
987-
Uses upsert semantics - unchanged documents won't be re-embedded.
985+
Indexes all searchable entities with aggregated types. Each entity appears
986+
once with all its types (e.g., an entity can be both Location and Country).
988987
989988
Use --rebuild to delete and recreate the index from scratch.
990989
"""
@@ -1002,75 +1001,87 @@ def index_build(batch_size, rebuild):
10021001
if search_service.ensure_index():
10031002
click.echo(f" Created index '{INDEX_NAME}'")
10041003

1005-
# Reindex all documents from each model type
1004+
# Get all searchable models
10061005
models = _get_search_indexed_models()
1006+
click.echo(f" Types: {', '.join(m.__name__ for m in models)}")
1007+
1008+
# Build dynamic SQL query
1009+
# LEFT JOIN each model table and build types array from which ones match
1010+
left_joins = []
1011+
case_statements = []
1012+
group_by_columns = ["we.wikidata_id"]
1013+
1014+
for model in models:
1015+
table_name = model.__tablename__
1016+
left_joins.append(
1017+
f"LEFT JOIN {table_name} ON we.wikidata_id = {table_name}.wikidata_id"
1018+
)
1019+
case_statements.append(
1020+
f"CASE WHEN {table_name}.wikidata_id IS NOT NULL THEN '{model.__name__}' END"
1021+
)
1022+
group_by_columns.append(f"{table_name}.wikidata_id")
1023+
1024+
array_expr = f"array_remove(ARRAY[{', '.join(case_statements)}], NULL)"
1025+
1026+
base_sql = f"""
1027+
SELECT
1028+
we.wikidata_id,
1029+
array_agg(DISTINCT wel.label) as labels,
1030+
{array_expr} as types
1031+
FROM wikidata_entities we
1032+
JOIN wikidata_entity_labels wel ON we.wikidata_id = wel.entity_id
1033+
{chr(10).join(left_joins)}
1034+
WHERE we.deleted_at IS NULL
1035+
GROUP BY {", ".join(group_by_columns)}
1036+
HAVING array_length({array_expr}, 1) > 0
1037+
"""
1038+
10071039
total_indexed = 0
10081040
task_uids = []
10091041

10101042
with Session(get_engine()) as session:
1011-
for model in models:
1012-
entity_type = model.__tablename__
1013-
click.echo(f"⏳ Indexing {entity_type}...")
1014-
1015-
# Count total
1016-
total = (
1017-
session.query(model)
1018-
.join(WikidataEntity, model.wikidata_id == WikidataEntity.wikidata_id)
1019-
.filter(WikidataEntity.deleted_at.is_(None))
1020-
.count()
1021-
)
1043+
# Count total
1044+
count_result = session.execute(text(f"SELECT COUNT(*) FROM ({base_sql}) subq"))
1045+
total = count_result.scalar()
10221046

1023-
if total == 0:
1024-
click.echo(f" No {entity_type} to index")
1025-
continue
1026-
1027-
indexed = 0
1028-
offset = 0
1047+
if total == 0:
1048+
click.echo(" No entities to index")
1049+
return
10291050

1030-
while offset < total:
1031-
# Fetch only wikidata_id and labels (no ORM objects, no timestamps)
1032-
rows = (
1033-
session.query(
1034-
model.wikidata_id,
1035-
func.array_agg(WikidataEntityLabel.label),
1036-
)
1037-
.join(
1038-
WikidataEntity, model.wikidata_id == WikidataEntity.wikidata_id
1039-
)
1040-
.join(
1041-
WikidataEntityLabel,
1042-
WikidataEntity.wikidata_id == WikidataEntityLabel.entity_id,
1043-
)
1044-
.filter(WikidataEntity.deleted_at.is_(None))
1045-
.group_by(model.wikidata_id)
1046-
.offset(offset)
1047-
.limit(batch_size)
1048-
.all()
1049-
)
1051+
click.echo(f" Found {total:,} entities to index")
10501052

1051-
if not rows:
1052-
break
1053+
# Process in batches
1054+
offset_val = 0
1055+
while offset_val < total:
1056+
paginated_sql = f"{base_sql} OFFSET :offset LIMIT :limit"
1057+
rows = session.execute(
1058+
text(paginated_sql),
1059+
{"offset": offset_val, "limit": batch_size},
1060+
).fetchall()
10531061

1054-
# Build search documents directly from rows
1055-
documents = [
1056-
SearchDocument(id=wikidata_id, type=entity_type, labels=labels)
1057-
for wikidata_id, labels in rows
1058-
]
1062+
if not rows:
1063+
break
10591064

1060-
# Send batch without waiting (enables Meilisearch auto-batching)
1061-
task_uid = search_service.index_documents(documents)
1062-
if task_uid is not None:
1063-
task_uids.append(task_uid)
1065+
# Build search documents
1066+
documents = [
1067+
SearchDocument(
1068+
id=row.wikidata_id, types=list(row.types), labels=list(row.labels)
1069+
)
1070+
for row in rows
1071+
]
10641072

1065-
indexed += len(documents)
1066-
offset += batch_size
1073+
# Send batch without waiting (enables Meilisearch auto-batching)
1074+
task_uid = search_service.index_documents(documents)
1075+
if task_uid is not None:
1076+
task_uids.append(task_uid)
10671077

1068-
click.echo(f" Sent: {indexed}/{total}")
1078+
total_indexed += len(documents)
1079+
offset_val += batch_size
10691080

1070-
total_indexed += indexed
1081+
click.echo(f" Sent: {total_indexed:,}/{total:,}")
10711082

10721083
click.echo(
1073-
f"✅ Sent {total_indexed} documents for indexing ({len(task_uids)} tasks)"
1084+
f"✅ Sent {total_indexed:,} documents for indexing ({len(task_uids)} tasks)"
10741085
)
10751086
click.echo(
10761087
" Indexing continues in the background. Use 'poliloom index-stats' to check progress."

poliloom/poliloom/importer/entity.py

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
WikidataEntityLabel,
2020
WikidataRelation,
2121
)
22-
from ..search import SearchDocument, SearchService
2322
from ..wikidata_entity_processor import WikidataEntityProcessor
2423

2524
logger = logging.getLogger(__name__)
@@ -53,27 +52,15 @@ def batch_size(self) -> int:
5352
"""Get current batch size."""
5453
return len(self.entities)
5554

56-
def insert(self, session: Session, search_service: SearchService) -> None:
57-
"""Insert entities and relations into database, then index to search.
55+
def insert(self, session: Session) -> None:
56+
"""Insert entities and relations into database.
5857
59-
Commits the transaction and indexes to search service after successful commit.
60-
Clears the batch after completion.
58+
Commits the transaction and clears the batch after completion.
59+
Search indexing is handled separately by the index-build command.
6160
"""
6261
if not self.has_entities():
6362
return
6463

65-
# Build search documents BEFORE modifying entities (labels get popped later)
66-
entity_type = self.model_class.__tablename__
67-
search_documents = [
68-
SearchDocument(
69-
id=entity["wikidata_id"],
70-
type=entity_type,
71-
labels=entity["labels"],
72-
)
73-
for entity in self.entities
74-
if entity.get("labels")
75-
]
76-
7764
# Insert WikidataEntity records first (without labels)
7865
entity_data = [
7966
{
@@ -117,10 +104,6 @@ def insert(self, session: Session, search_service: SearchService) -> None:
117104

118105
session.commit()
119106

120-
# Index to search after successful commit
121-
if search_documents:
122-
search_service.index_documents(search_documents)
123-
124107
logger.debug(
125108
f"Processed {len(self.entities)} {self.model_class.__name__.lower()}s "
126109
f"with {len(self.relations)} relations"
@@ -158,7 +141,6 @@ def _process_supporting_entities_chunk(
158141
# Create fresh connections for this worker process
159142
engine = create_engine(pool_size=2, max_overflow=3)
160143
session = Session(engine)
161-
search_service = SearchService()
162144

163145
# Entity collections organized by type, built from worker_config
164146
entity_collections = [
@@ -239,7 +221,7 @@ def _process_supporting_entities_chunk(
239221
# Process batches when they reach the batch size
240222
for collection in entity_collections:
241223
if collection.batch_size() >= batch_size:
242-
collection.insert(session, search_service)
224+
collection.insert(session)
243225

244226
except Exception as e:
245227
logger.error(f"Worker {worker_id}: error processing chunk: {e}")
@@ -250,7 +232,7 @@ def _process_supporting_entities_chunk(
250232
# Process remaining entities in final batches on successful completion
251233
for collection in entity_collections:
252234
if collection.has_entities():
253-
collection.insert(session, search_service)
235+
collection.insert(session)
254236

255237
session.close()
256238
logger.info(f"Worker {worker_id}: finished processing {entity_count} entities")

poliloom/poliloom/importer/politician.py

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
WikipediaLink,
2222
WikipediaProject,
2323
)
24-
from ..search import SearchDocument, SearchService
2524
from ..wikidata_entity_processor import WikidataEntityProcessor
2625

2726
logger = logging.getLogger(__name__)
@@ -109,24 +108,14 @@ def _should_import_politician(entity: WikidataEntityProcessor) -> bool:
109108
return True
110109

111110

112-
def _insert_politicians_batch(
113-
politicians: list[dict], session: Session, search_service: SearchService
114-
) -> None:
115-
"""Insert a batch of politicians into the database and index to Meilisearch."""
111+
def _insert_politicians_batch(politicians: list[dict], session: Session) -> None:
112+
"""Insert a batch of politicians into the database.
113+
114+
Search indexing is handled separately by the index-build command.
115+
"""
116116
if not politicians:
117117
return
118118

119-
# Build search documents BEFORE modifying politicians (labels get used later)
120-
search_documents: list[SearchDocument] = [
121-
SearchDocument(
122-
id=p["wikidata_id"],
123-
type=Politician.__tablename__,
124-
labels=p["labels"],
125-
)
126-
for p in politicians
127-
if p.get("labels")
128-
]
129-
130119
# First, ensure WikidataEntity records exist for all politicians (without labels)
131120
wikidata_data = [
132121
{
@@ -211,10 +200,6 @@ def _insert_politicians_batch(
211200

212201
session.commit()
213202

214-
# Index to Meilisearch after successful DB commit
215-
if search_documents:
216-
search_service.index_documents(search_documents)
217-
218203
logger.debug(f"Processed {len(politicians)} politicians (upserted)")
219204

220205

@@ -233,7 +218,6 @@ def _process_politicians_chunk(
233218
"""
234219
# Create fresh connections for this worker process
235220
engine = create_engine(pool_size=2, max_overflow=3)
236-
search_service = SearchService()
237221

238222
politicians = []
239223
politician_count = 0
@@ -419,7 +403,7 @@ def _process_politicians_chunk(
419403
# Process batches when they reach the batch size
420404
if len(politicians) >= batch_size:
421405
with Session(engine) as session:
422-
_insert_politicians_batch(politicians, session, search_service)
406+
_insert_politicians_batch(politicians, session)
423407
politicians = []
424408

425409
except Exception as e:
@@ -429,7 +413,7 @@ def _process_politicians_chunk(
429413
# Process remaining entities in final batch on successful completion
430414
if politicians:
431415
with Session(engine) as session:
432-
_insert_politicians_batch(politicians, session, search_service)
416+
_insert_politicians_batch(politicians, session)
433417

434418
logger.info(f"Worker {worker_id}: finished processing {entity_count} entities")
435419

poliloom/poliloom/models/wikidata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def find_similar(
137137
"""
138138
return search_service.search(
139139
query,
140-
entity_type=cls.__tablename__,
140+
entity_type=cls.__name__,
141141
limit=limit,
142142
semantic_ratio=cls._search_semantic_ratio,
143143
)

poliloom/poliloom/search.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class SearchDocument(TypedDict):
2828
"""Document format for Meilisearch indexing."""
2929

3030
id: str
31-
type: str # Entity type (e.g., 'locations', 'politicians')
31+
types: list[str] # Entity types (e.g., ['Location', 'Country'])
3232
labels: list[str]
3333

3434

@@ -66,8 +66,8 @@ def create_index(self) -> None:
6666
task = index.update_settings(
6767
{
6868
"searchableAttributes": ["labels"],
69-
"filterableAttributes": ["type"],
70-
"displayedAttributes": ["id", "type", "labels"],
69+
"filterableAttributes": ["types"],
70+
"displayedAttributes": ["id", "types", "labels"],
7171
}
7272
)
7373
self.client.wait_for_task(task.task_uid)
@@ -179,7 +179,7 @@ def search(
179179
180180
Args:
181181
query: Search query text
182-
entity_type: Optional type filter (e.g., 'locations', 'politicians')
182+
entity_type: Optional type filter (e.g., 'Location', 'Politician')
183183
limit: Maximum number of results
184184
semantic_ratio: Balance between keyword (0.0) and semantic (1.0) search.
185185
Default 0.0 uses pure keyword search for backward compatibility.
@@ -191,7 +191,7 @@ def search(
191191
index = self.client.index(INDEX_NAME)
192192
search_params: dict = {"limit": limit}
193193
if entity_type:
194-
search_params["filter"] = f"type = '{entity_type}'"
194+
search_params["filter"] = f"types = '{entity_type}'"
195195

196196
# Use hybrid search when semantic_ratio > 0
197197
if semantic_ratio > 0:

0 commit comments

Comments
 (0)