Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions nmdc_server/ingest/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
envo,
kegg,
omics_processing,
ontology,
pipeline,
search_index,
study,
Expand Down Expand Up @@ -62,9 +63,21 @@ def load(db: Session, function_limit=None, skip_annotation=False):
)
mongodb = client[settings.mongo_database]

logger.info("Loading envo terms...")
envo.load(db)
db.commit()
# Load generic ontology data first
logger.info("Loading ontology data...")
if "ontology_class_set" in mongodb.list_collection_names() and "ontology_relation_set" in mongodb.list_collection_names():
ontology.load(
db,
mongodb["ontology_class_set"].find(),
mongodb["ontology_relation_set"].find()
)
db.commit()
logger.info("Ontology data loaded successfully")
else:
logger.warning("Ontology collections not found in MongoDB, using legacy ENVO loading")
logger.info("Loading envo terms...")
envo.load(db)
db.commit()

logger.info("Loading Kegg orthology...")
kegg.load(db)
Expand Down
286 changes: 286 additions & 0 deletions nmdc_server/ingest/ontology.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
"""ETL script to load generic ontology data from MongoDB to PostgreSQL."""

import logging
from typing import Dict, List, Optional, Set

from pymongo.cursor import Cursor
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

from nmdc_server.ingest.common import extract_extras, extract_value
from nmdc_server.models import OntologyClass, OntologyRelation
from nmdc_server.schemas import OntologyClassCreate

logger = logging.getLogger(__name__)


class OntologyClassLoader(OntologyClassCreate):
"""Pydantic model for validating and transforming OntologyClass documents from MongoDB."""

@classmethod
def from_mongo(cls, doc: Dict) -> "OntologyClassLoader":
"""Create an OntologyClassLoader from a MongoDB document.

Handles the transformation from MongoDB's schema to our PostgreSQL schema.
"""
# Extract relations for separate processing
relations = doc.pop("relations", [])

# Transform the document
transformed = {
"id": doc.get("id"),
"type": doc.get("type", "nmdc:OntologyClass"),
"name": doc.get("name", ""),
"definition": doc.get("definition", ""),
"alternative_names": doc.get("alternative_names", []),
"is_root": doc.get("is_root", False),
"is_obsolete": doc.get("is_obsolete", False),
}

# Extract any extra fields into annotations
transformed = extract_extras(cls, transformed, exclude={"relations"})

return cls(**transformed)


def load_ontology_classes(db: Session, cursor: Cursor) -> Dict[str, Set[str]]:
"""Load ontology classes from MongoDB cursor into PostgreSQL.

Returns:
Dict mapping ontology prefixes to sets of loaded class IDs
"""
logger.info("Loading ontology classes...")

loaded_classes: Dict[str, Set[str]] = {}
batch = []
batch_size = 1000
total_count = 0

for doc in cursor:
try:
# Transform the document
ontology_class = OntologyClassLoader.from_mongo(doc)

# Track loaded classes by prefix
prefix = ontology_class.ontology_prefix
if prefix not in loaded_classes:
loaded_classes[prefix] = set()
loaded_classes[prefix].add(ontology_class.id)

batch.append(ontology_class.model_dump())

# Bulk insert when batch is full
if len(batch) >= batch_size:
_bulk_upsert_classes(db, batch)
total_count += len(batch)
logger.info(f"Loaded {total_count} ontology classes...")
batch = []

except Exception as e:
logger.error(f"Error loading ontology class {doc.get('id', 'unknown')}: {e}")
continue

# Insert remaining batch
if batch:
_bulk_upsert_classes(db, batch)
total_count += len(batch)

logger.info(f"Finished loading {total_count} ontology classes")

# Log summary by ontology
for prefix, ids in loaded_classes.items():
logger.info(f" {prefix}: {len(ids)} classes")

return loaded_classes


def _bulk_upsert_classes(db: Session, classes: List[Dict]) -> None:
"""Bulk upsert ontology classes using PostgreSQL's ON CONFLICT."""
if not classes:
return

stmt = insert(OntologyClass).values(classes)
stmt = stmt.on_conflict_do_update(
index_elements=['id'],
set_={
'name': stmt.excluded.name,
'definition': stmt.excluded.definition,
'alternative_names': stmt.excluded.alternative_names,
'is_root': stmt.excluded.is_root,
'is_obsolete': stmt.excluded.is_obsolete,
'annotations': stmt.excluded.annotations,
}
)
db.execute(stmt)
db.commit()


def load_ontology_relations(db: Session, cursor: Cursor, loaded_classes: Dict[str, Set[str]]) -> None:
"""Load ontology relations from MongoDB cursor into PostgreSQL.

Args:
db: SQLAlchemy session
cursor: MongoDB cursor for ontology_relation_set
loaded_classes: Dict mapping prefixes to loaded class IDs
"""
logger.info("Loading ontology relations...")

batch = []
batch_size = 5000
total_count = 0
skipped_count = 0

# Flatten loaded classes for easier lookup
all_loaded_ids = set()
for ids in loaded_classes.values():
all_loaded_ids.update(ids)

for doc in cursor:
try:
subject = doc.get("subject")
predicate = doc.get("predicate")
obj = doc.get("object")

# Skip if required fields are missing
if not all([subject, predicate, obj]):
skipped_count += 1
continue

# Only include relations where the subject exists in our loaded classes
# (object can be from external ontologies)
if subject not in all_loaded_ids:
skipped_count += 1
continue

relation = {
"subject": subject,
"predicate": predicate,
"object": obj,
"type": doc.get("type", "nmdc:OntologyRelation")
}

batch.append(relation)

# Bulk insert when batch is full
if len(batch) >= batch_size:
_bulk_insert_relations(db, batch)
total_count += len(batch)
logger.info(f"Loaded {total_count} ontology relations...")
batch = []

except Exception as e:
logger.error(f"Error loading ontology relation: {e}")
skipped_count += 1
continue

# Insert remaining batch
if batch:
_bulk_insert_relations(db, batch)
total_count += len(batch)

logger.info(f"Finished loading {total_count} ontology relations (skipped {skipped_count})")


def _bulk_insert_relations(db: Session, relations: List[Dict]) -> None:
"""Bulk insert ontology relations, ignoring conflicts."""
if not relations:
return

stmt = insert(OntologyRelation).values(relations)
# Use on_conflict_do_nothing since we have a unique constraint on (subject, predicate, object)
stmt = stmt.on_conflict_do_nothing(
constraint='ontology_relation_subject_predicate_object_key'
)
db.execute(stmt)
db.commit()


def populate_envo_terms_from_ontology(db: Session) -> None:
"""Populate the EnvoTerm and EnvoAncestor tables from the generic ontology tables.

This maintains backward compatibility with existing code that uses EnvoTerm.
"""
logger.info("Populating EnvoTerm table from generic ontology data...")

# First, clear existing ENVO data
db.execute("DELETE FROM envo_ancestor")
db.execute("DELETE FROM envo_term")
db.commit()

# Insert ENVO terms from OntologyClass
insert_envo_terms_sql = """
INSERT INTO envo_term (id, label, data)
SELECT
oc.id,
oc.name as label,
jsonb_build_object(
'definition', COALESCE(oc.definition, ''),
'alternative_names', oc.alternative_names,
'is_obsolete', oc.is_obsolete,
'is_root', oc.is_root,
'annotations', oc.annotations
) as data
FROM ontology_class oc
WHERE oc.id LIKE 'ENVO:%'
"""
db.execute(insert_envo_terms_sql)

# Populate EnvoAncestor with direct parent relationships
insert_direct_parents_sql = """
INSERT INTO envo_ancestor (id, ancestor_id, direct)
SELECT DISTINCT
r.subject as id,
r.object as ancestor_id,
true as direct
FROM ontology_relation r
WHERE r.subject LIKE 'ENVO:%'
AND r.object LIKE 'ENVO:%'
AND r.predicate IN ('rdfs:subClassOf', 'BFO:0000050')
"""
db.execute(insert_direct_parents_sql)

# Populate EnvoAncestor with all ancestors (including indirect)
# This uses the closure relationships if they exist
insert_all_ancestors_sql = """
INSERT INTO envo_ancestor (id, ancestor_id, direct)
SELECT DISTINCT
r.subject as id,
r.object as ancestor_id,
false as direct
FROM ontology_relation r
WHERE r.subject LIKE 'ENVO:%'
AND r.object LIKE 'ENVO:%'
AND r.predicate = 'entailed_isa_partof_closure'
AND NOT EXISTS (
SELECT 1 FROM envo_ancestor ea
WHERE ea.id = r.subject
AND ea.ancestor_id = r.object
)
"""
db.execute(insert_all_ancestors_sql)

db.commit()

# Get counts for logging
envo_term_count = db.execute("SELECT COUNT(*) FROM envo_term").scalar()
envo_ancestor_count = db.execute("SELECT COUNT(*) FROM envo_ancestor").scalar()

logger.info(f"Populated {envo_term_count} ENVO terms and {envo_ancestor_count} ancestor relationships")


def load(db: Session, class_cursor: Cursor, relation_cursor: Cursor):
"""Main entry point for loading ontology data.

Args:
db: SQLAlchemy session
class_cursor: MongoDB cursor for ontology_class_set
relation_cursor: MongoDB cursor for ontology_relation_set
"""
# Load ontology classes
loaded_classes = load_ontology_classes(db, class_cursor)

# Load ontology relations
load_ontology_relations(db, relation_cursor, loaded_classes)

# Populate ENVO-specific tables for backward compatibility
populate_envo_terms_from_ontology(db)
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Add generic ontology tables

Revision ID: 2cc22d82c9ce
Revises: ffa58e5f59fe
Create Date: 2024-12-10 12:00:00.000000

"""

from typing import Optional

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = "2cc22d82c9ce"
down_revision: Optional[str] = "ffa58e5f59fe"
branch_labels: Optional[str] = None
depends_on: Optional[str] = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"ontology_class",
sa.Column("id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("definition", sa.Text(), nullable=True),
sa.Column("alternative_names", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("is_root", sa.Boolean(), nullable=False),
sa.Column("is_obsolete", sa.Boolean(), nullable=False),
sa.Column("annotations", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ontology_class"))
)
op.create_index(op.f("ix_ontology_class_name"), "ontology_class", ["name"], unique=False)

op.create_table(
"ontology_relation",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("subject", sa.String(), nullable=False),
sa.Column("predicate", sa.String(), nullable=False),
sa.Column("object", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.ForeignKeyConstraint(["subject"], ["ontology_class.id"], name=op.f("fk_ontology_relation_subject_ontology_class")),
sa.PrimaryKeyConstraint("id", name=op.f("pk_ontology_relation")),
sa.UniqueConstraint("subject", "predicate", "object", name=op.f("uq_ontology_relation_subject_predicate_object"))
)
op.create_index("idx_ontology_relation_object", "ontology_relation", ["object"], unique=False)
op.create_index("idx_ontology_relation_predicate", "ontology_relation", ["predicate"], unique=False)
op.create_index("idx_ontology_relation_subject", "ontology_relation", ["subject"], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("idx_ontology_relation_subject", table_name="ontology_relation")
op.drop_index("idx_ontology_relation_predicate", table_name="ontology_relation")
op.drop_index("idx_ontology_relation_object", table_name="ontology_relation")
op.drop_table("ontology_relation")
op.drop_index(op.f("ix_ontology_class_name"), table_name="ontology_class")
op.drop_table("ontology_class")
# ### end Alembic commands ###
Loading
Loading