Skip to content

Commit 2de546c

Browse files
committed
move some logic around, commit batch before writing entity labels
1 parent 23ce065 commit 2de546c

File tree

3 files changed

+117
-115
lines changed

3 files changed

+117
-115
lines changed

poliloom/poliloom/importer/entity.py

Lines changed: 80 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,6 @@ def add_relations(self, relations: list[dict]) -> None:
4646
"""Add relations for the last added entity."""
4747
self.relations.extend(relations)
4848

49-
def clear_batch(self) -> None:
50-
"""Clear entities and relations for batch processing."""
51-
self.entities = []
52-
self.relations = []
53-
5449
def has_entities(self) -> bool:
5550
"""Check if collection has entities."""
5651
return len(self.entities) > 0
@@ -59,6 +54,84 @@ def batch_size(self) -> int:
5954
"""Get current batch size."""
6055
return len(self.entities)
6156

57+
def insert(self, session: Session, search_service: SearchService) -> None:
58+
"""Insert entities and relations into database, then index to search.
59+
60+
Commits the transaction and indexes to search service after successful commit.
61+
Clears the batch after completion.
62+
"""
63+
if not self.has_entities():
64+
return
65+
66+
# Build search documents BEFORE modifying entities (labels get popped later)
67+
search_documents: list[SearchDocument] = []
68+
if issubclass(self.model_class, SearchIndexedMixin):
69+
entity_type = self.model_class.__tablename__
70+
search_documents = [
71+
SearchDocument(
72+
id=entity["wikidata_id"],
73+
type=entity_type,
74+
labels=entity.get("labels") or [],
75+
)
76+
for entity in self.entities
77+
]
78+
79+
# Insert WikidataEntity records first (without labels)
80+
entity_data = [
81+
{
82+
"wikidata_id": entity["wikidata_id"],
83+
"name": entity["name"],
84+
"description": entity["description"],
85+
}
86+
for entity in self.entities
87+
]
88+
89+
WikidataEntity.upsert_batch(session, entity_data)
90+
91+
# Insert labels into separate table
92+
label_data = []
93+
for entity in self.entities:
94+
labels = entity.get("labels")
95+
if labels:
96+
for label in labels:
97+
label_data.append(
98+
{
99+
"entity_id": entity["wikidata_id"],
100+
"label": label,
101+
}
102+
)
103+
104+
if label_data:
105+
WikidataEntityLabel.upsert_batch(session, label_data)
106+
107+
# Insert entities referencing the WikidataEntity records
108+
# Remove 'name', 'description', and 'labels' keys since they're now stored separately
109+
for entity in self.entities:
110+
entity.pop("name", None)
111+
entity.pop("description", None)
112+
entity.pop("labels", None)
113+
114+
self.model_class.upsert_batch(session, self.entities)
115+
116+
# Insert relations for these entities
117+
if self.relations:
118+
WikidataRelation.upsert_batch(session, self.relations)
119+
120+
session.commit()
121+
122+
# Index to search after successful commit
123+
if search_documents:
124+
search_service.index_documents(search_documents)
125+
126+
logger.debug(
127+
f"Processed {len(self.entities)} {self.model_class.__name__.lower()}s "
128+
f"with {len(self.relations)} relations"
129+
)
130+
131+
# Clear batch after successful insert
132+
self.entities = []
133+
self.relations = []
134+
62135

63136
# Progress reporting frequency for chunk processing
64137
PROGRESS_REPORT_FREQUENCY = 50000
@@ -68,83 +141,6 @@ def batch_size(self) -> int:
68141
worker_config: dict | None = None
69142

70143

71-
def _insert_entities_batch(
72-
collection: EntityCollection,
73-
session: Session,
74-
search_service: SearchService,
75-
) -> None:
76-
"""Insert a batch of entities and their relations into the database.
77-
78-
Also indexes to search service if the model has SearchIndexedMixin.
79-
"""
80-
if not collection.has_entities():
81-
return
82-
83-
# Build search documents BEFORE modifying entities (labels get popped later)
84-
search_documents: list[SearchDocument] = []
85-
if issubclass(collection.model_class, SearchIndexedMixin):
86-
entity_type = collection.model_class.__tablename__
87-
search_documents = [
88-
SearchDocument(
89-
id=entity["wikidata_id"],
90-
type=entity_type,
91-
labels=entity.get("labels") or [],
92-
)
93-
for entity in collection.entities
94-
]
95-
96-
# Insert WikidataEntity records first (without labels)
97-
entity_data = [
98-
{
99-
"wikidata_id": entity["wikidata_id"],
100-
"name": entity["name"],
101-
"description": entity["description"],
102-
}
103-
for entity in collection.entities
104-
]
105-
106-
WikidataEntity.upsert_batch(session, entity_data)
107-
108-
# Insert labels into separate table
109-
label_data = []
110-
for entity in collection.entities:
111-
labels = entity.get("labels")
112-
if labels:
113-
for label in labels:
114-
label_data.append(
115-
{
116-
"entity_id": entity["wikidata_id"],
117-
"label": label,
118-
}
119-
)
120-
121-
if label_data:
122-
WikidataEntityLabel.upsert_batch(session, label_data)
123-
124-
# Insert entities referencing the WikidataEntity records
125-
# Remove 'name', 'description', and 'labels' keys since they're now stored separately
126-
for entity in collection.entities:
127-
entity.pop("name", None)
128-
entity.pop("description", None)
129-
entity.pop("labels", None)
130-
131-
collection.model_class.upsert_batch(session, collection.entities)
132-
133-
# Insert relations for these entities
134-
if collection.relations:
135-
WikidataRelation.upsert_batch(session, collection.relations)
136-
137-
session.flush()
138-
139-
# Index to Meilisearch after successful DB insert
140-
if search_documents:
141-
search_service.index_documents(search_documents)
142-
143-
logger.debug(
144-
f"Processed {len(collection.entities)} {collection.model_class.__name__.lower()}s with {len(collection.relations)} relations"
145-
)
146-
147-
148144
def _process_supporting_entities_chunk(
149145
dump_file_path: str,
150146
start_byte: int,
@@ -245,9 +241,7 @@ def _process_supporting_entities_chunk(
245241
# Process batches when they reach the batch size
246242
for collection in entity_collections:
247243
if collection.batch_size() >= batch_size:
248-
_insert_entities_batch(collection, session, search_service)
249-
session.commit()
250-
collection.clear_batch()
244+
collection.insert(session, search_service)
251245

252246
except Exception as e:
253247
logger.error(f"Worker {worker_id}: error processing chunk: {e}")
@@ -258,8 +252,7 @@ def _process_supporting_entities_chunk(
258252
# Process remaining entities in final batches on successful completion
259253
for collection in entity_collections:
260254
if collection.has_entities():
261-
_insert_entities_batch(collection, session, search_service)
262-
session.commit()
255+
collection.insert(session, search_service)
263256

264257
session.close()
265258
logger.info(f"Worker {worker_id}: finished processing {entity_count} entities")

poliloom/poliloom/models/wikidata.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from .base import (
3030
Base,
3131
RelationType,
32+
SearchIndexedMixin,
3233
SoftDeleteMixin,
3334
TimestampMixin,
3435
UpsertMixin,
@@ -310,13 +311,14 @@ def _build_outside_hierarchy_subquery(
310311

311312
@classmethod
312313
def cleanup_outside_hierarchy(
313-
cls, session: Session, dry_run: bool = False
314+
cls,
315+
session: Session,
316+
dry_run: bool = False,
314317
) -> dict[str, int]:
315318
"""Remove entities outside the configured hierarchy.
316319
317320
Soft-deletes properties referencing these entities (if applicable),
318-
then hard-deletes the entity records. Uses subqueries to avoid
319-
materializing large ID lists.
321+
then hard-deletes the entity records and removes them from search index.
320322
321323
Args:
322324
session: Database session
@@ -332,6 +334,7 @@ def cleanup_outside_hierarchy(
332334
- 'total_entities': Total entities before cleanup
333335
"""
334336
from poliloom.models import Evaluation, Property
337+
from poliloom.search import SearchService
335338

336339
prop_type = getattr(cls, "_cleanup_property_type", None)
337340
root_ids = getattr(cls, "_hierarchy_roots", None)
@@ -427,11 +430,20 @@ def cleanup_outside_hierarchy(
427430
).rowcount
428431
stats["properties_deleted"] = props_deleted
429432

430-
# Hard-delete entity records
431-
deleted = session.execute(
432-
delete(cls).where(cls.wikidata_id.in_(select(outside_subquery)))
433-
).rowcount
434-
stats["entities_removed"] = deleted
433+
# Hard-delete entity records and get deleted IDs via RETURNING
434+
delete_stmt = (
435+
delete(cls)
436+
.where(cls.wikidata_id.in_(select(outside_subquery)))
437+
.returning(cls.wikidata_id)
438+
)
439+
deleted_rows = session.execute(delete_stmt).fetchall()
440+
deleted_ids = [row[0] for row in deleted_rows]
441+
stats["entities_removed"] = len(deleted_ids)
442+
443+
# Clean up search index for entities with SearchIndexedMixin
444+
if deleted_ids and issubclass(cls, SearchIndexedMixin):
445+
search_service = SearchService()
446+
search_service.delete_documents(deleted_ids)
435447

436448
return stats
437449

0 commit comments

Comments
 (0)