Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .docker-env
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
POSTGRES_PASSWORD=postgres
POSTGRES_DB=nmdc_a
POSTGRES_DB=nmdc
POSTGRES_USER=postgres
PGUSER=postgres
PGPASSWORD=postgres
PGDATABASE=nmdc_a
NMDC_DATABASE_URI="postgresql://postgres:postgres@db/nmdc_a"
PGDATABASE=nmdc
NMDC_DATABASE_URI="postgresql://postgres:postgres@db/nmdc"
NMDC_CELERY_BACKEND="redis://redis:6379/0"
NMDC_CELERY_BROKER="redis://redis:6379/0"
NMDC_INGEST_DATABASE_URI="postgresql://postgres:postgres@db/nmdc_a"
NMDC_MONGO_HOST=tunnel
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# NMDC Database URI Override
# Uncomment and modify these lines to provide arguments to nmdc server backend if running OUTSIDE the docker environment.
# Otherwise, .docker-env will take care of providing these arguments.
# NMDC_DATABASE_URI="postgresql:///nmdc_a"
# NMDC_DATABASE_URI="postgresql:///nmdc"
# NMDC_TESTING_DATABASE_URI="postgresql:///nmdc_testing"

# OAuth Setup
Expand Down
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ NMDC_MONGO_PASSWORD=changeme
With that file in place, populate the docker volume by running,

```bash
docker-compose run backend nmdc-server truncate # if necessary
docker-compose run backend nmdc-server migrate
docker-compose run backend nmdc-server -vv ingest --function-limit 100
```
Expand Down Expand Up @@ -126,7 +125,7 @@ In order to generate a migration, your database state should match HEAD. If you
docker-compose down -v
docker-compose up -d db
# Create the database
docker-compose run backend psql -c "create database nmdc_a;" -d postgres
docker-compose run backend psql -c "create database nmdc;" -d postgres
# Run migrations to HEAD
docker-compose run backend alembic -c nmdc_server/alembic.ini upgrade head
# Autogenerate a migration diff from the current HEAD
Expand All @@ -138,15 +137,15 @@ docker-compose run backend alembic -c nmdc_server/alembic.ini revision --autogen
You can find existing database exports in Notion.

```bash
# export, and
docker-compose run backend bash -c 'pg_dump nmdc_a > /app/nmdc_server/nmdc_a.sql'
# export, and
docker-compose run backend bash -c 'pg_dump nmdc > /app/nmdc_server/nmdc.sql'

# import -- starting from an EMPTY database with DB running
docker-compose down -v
docker-compose up -d db
docker-compose run backend psql -c "create database nmdc_a;" -d postgres
cp downloads/nmdc_a.sql nmdc_server/nmdc_a.sql
docker-compose run backend bash -c 'psql nmdc_a < /app/nmdc_server/nmdc_a.sql'
docker-compose run backend psql -c "create database nmdc;" -d postgres
cp downloads/nmdc.sql nmdc_server/nmdc.sql
docker-compose run backend bash -c 'psql nmdc < /app/nmdc_server/nmdc.sql'
docker-compose run backend nmdc-server migrate # stamp the migration db
```

Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ services:

volumes:
app-db-data:
app-db-ingest:

networks:
public:
14 changes: 6 additions & 8 deletions nmdc_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
)
from nmdc_server.bulk_download_schema import BulkDownload, BulkDownloadCreate
from nmdc_server.data_object_filters import WorkflowActivityTypeEnum
from nmdc_server.database import get_db
from nmdc_server.database import get_db, is_ingest_locked
from nmdc_server.ingest.envo import nested_envo_trees
from nmdc_server.models import IngestLock, SubmissionMetadata, User
from nmdc_server.models import SubmissionMetadata, User
from nmdc_server.pagination import Pagination

router = APIRouter()
Expand Down Expand Up @@ -382,11 +382,10 @@ async def run_ingest(
params: schemas.IngestArgumentSchema = schemas.IngestArgumentSchema(),
db: Session = Depends(get_db),
):
lock = db.query(IngestLock).first()
if lock:
if is_ingest_locked(db):
raise HTTPException(
status_code=409,
detail=f"An ingest started at {lock.started} is already in progress",
detail="An ingest is already in progress",
)
jobs.ingest.delay(function_limit=params.function_limit, skip_annotation=params.skip_annotation)
return ""
Expand All @@ -400,11 +399,10 @@ async def run_ingest(
async def repopulate_gene_functions(
user: models.User = Depends(admin_required), db: Session = Depends(get_db)
):
lock = db.query(IngestLock).first()
if lock:
if is_ingest_locked(db):
raise HTTPException(
status_code=409,
detail=f"An ingest started at {lock.started} is in progress",
detail="An ingest is already in progress",
)
jobs.populate_gene_functions.delay()
return ""
Expand Down
63 changes: 20 additions & 43 deletions nmdc_server/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,37 @@
from typing import Optional

import click
from alembic import command
from alembic.config import Config

from nmdc_server import jobs, models
from nmdc_server.config import Settings, settings
from nmdc_server.database import SessionLocal, SessionLocalIngest
from nmdc_server import database
from nmdc_server.config import settings
from nmdc_server.ingest import errors
from nmdc_server.ingest.all import load
from nmdc_server.ingest.common import maybe_merge_download_artifact
from nmdc_server.logger import get_logger


@click.group()
@click.pass_context
def cli(ctx):
settings = Settings()
if settings.environment == "testing":
settings.database_uri = settings.testing_database_uri
ctx.obj = {"settings": settings}


@cli.command()
@click.option("--ingest-db", is_flag=True, default=False)
def migrate(ingest_db: bool):
def migrate():
"""Upgrade the database schema."""
jobs.migrate(ingest_db=ingest_db)
database_uri = settings.current_db_uri
session_maker = database.SessionLocal


@cli.command()
def truncate():
"""Remove all existing data from the ingest database."""
with SessionLocalIngest() as db:
try:
db.execute("select truncate_tables()").all()
db.commit()
except Exception:
db.rollback()
db.execute(
"""
DO $$ DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = current_schema())
LOOP
EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE';
END LOOP;
END $$;
"""
)
db.commit()
with session_maker.begin() as db: # type: ignore
database.get_ingest_lock(db)
alembic_cfg = Config(str(Path(__file__).parent / "alembic.ini"))
alembic_cfg.set_main_option("script_location", str(Path(__file__).parent / "migrations"))
alembic_cfg.set_main_option("sqlalchemy.url", database_uri)
alembic_cfg.attributes["configure_logger"] = True
command.upgrade(alembic_cfg, "head")
Comment on lines +30 to +36
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This invariably runs the migration which results in a no-op if no migrations are required. However, the migration command must acquire the ingest lock.

Since our container invokes the nmdc-server migrate command on startup, a currently running ingestion will block the container from even starting as the migrate command will throw an error if the lock cannot be acquired.

This is desirable behavior if migrations must be applied, but bad if the migration command would be a no-op. We need to change this to only conditionally apply migrations if required. Luckily, this has been implemented in the past.



@cli.command()
Expand All @@ -67,17 +50,11 @@ def ingest(verbose, function_limit, skip_annotation):
logger = get_logger(__name__)
logging.basicConfig(level=level, format="%(message)s")
logger.setLevel(logging.INFO)
jobs.migrate(ingest_db=True)
with SessionLocalIngest() as ingest_db:
load(ingest_db, function_limit=function_limit, skip_annotation=skip_annotation)
if settings.current_db_uri != settings.ingest_database_uri:
with SessionLocal() as prod_db:
# copy persistent data from the production db to the ingest db
maybe_merge_download_artifact(ingest_db, prod_db.query(models.FileDownload))
maybe_merge_download_artifact(ingest_db, prod_db.query(models.BulkDownload))
maybe_merge_download_artifact(
ingest_db, prod_db.query(models.BulkDownloadDataObject)
)
with database.SessionLocal() as db:
with db.begin():
database.get_ingest_lock(db)
database.clear_tables(db)
load(db, function_limit=function_limit, skip_annotation=skip_annotation)

for m, s in errors.missing.items():
click.echo(f"missing {m}:")
Expand All @@ -99,7 +76,7 @@ def shell(print_sql: bool, script: Optional[Path]):

imports = [
"from nmdc_server.config import settings",
"from nmdc_server.database import SessionLocal, SessionLocalIngest",
"from nmdc_server.database import SessionLocal",
"from nmdc_server.models import "
"Biosample, EnvoAncestor, EnvoTerm, EnvoTree, OmicsProcessing, Study",
]
Expand Down
4 changes: 1 addition & 3 deletions nmdc_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ class Settings(BaseSettings):
environment: str = "production"

# Several different database urls are configured for different
# environments. In production, only database_uri and ingest_database_uri
# are used.
# environments. In production, only database_uri is used.
database_uri: str = "postgresql:///nmdc"
ingest_database_uri: str = "postgresql:///nmdc_testing"
testing_database_uri: str = "postgresql:///nmdc_testing"

# database tuning knobs
Expand Down
7 changes: 5 additions & 2 deletions nmdc_server/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def get_study(db: Session, study_id: str) -> Optional[models.Study]:
).first()


def create_study(db: Session, study: schemas.StudyCreate) -> models.Study:
def create_study(db: Session, study: schemas.StudyCreate, commit=True) -> models.Study:
study_dict = study.dict()

websites = study_dict.pop("principal_investigator_websites")
Expand All @@ -113,7 +113,10 @@ def create_study(db: Session, study: schemas.StudyCreate) -> models.Study:
db_study.publication_dois.append(study_publication) # type: ignore

db.add(db_study)
db.commit()
if commit:
db.commit()
else:
db.flush()
db.refresh(db_study)
return db_study

Expand Down
84 changes: 59 additions & 25 deletions nmdc_server/database.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from sqlalchemy import create_engine
from sqlalchemy.event import listen
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.schema import DDL, MetaData

from nmdc_server.config import settings
Expand All @@ -15,9 +15,7 @@
"max_overflow": settings.db_pool_max_overflow,
}
engine = create_engine(settings.current_db_uri, **_engine_kwargs)
engine_ingest = create_engine(settings.ingest_database_uri, **_engine_kwargs)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
SessionLocalIngest = sessionmaker(autocommit=False, autoflush=False, bind=engine_ingest)

# This is to avoid having to manually name all constraints
# See: http://alembic.zzzcomputing.com/en/latest/naming.html
Expand Down Expand Up @@ -106,28 +104,6 @@
end;
$$
language plpgsql;

/*
A convenience function to truncate all tables that get repopulated
during an ingest. Any tables storing persistent data should be
added as an exception here.
*/
CREATE OR REPLACE FUNCTION truncate_tables() RETURNS void AS $$
DECLARE
statements CURSOR FOR
SELECT tablename FROM pg_tables
WHERE schemaname = 'public'
and tablename <> 'alembic_version'
and tablename <> 'file_download'
and tablename <> 'ingest_lock'
and tablename <> 'bulk_download'
and tablename <> 'bulk_download_data_object';
BEGIN
FOR stmt IN statements LOOP
EXECUTE 'TRUNCATE TABLE ' || quote_ident(stmt.tablename) || ' CASCADE;';
END LOOP;
END;
$$ LANGUAGE plpgsql;
"""
),
)
Expand Down Expand Up @@ -187,3 +163,61 @@
def get_db():
with SessionLocal() as db:
yield db


INGEST_LOCK_ID = 0


class IngestLockNotAcquired(Exception):
...


def get_ingest_lock(db: Session):
"""
https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
"""
result: tuple[bool] = db.execute(f"SELECT pg_try_advisory_lock({INGEST_LOCK_ID});").first()
if not result[0]:
raise IngestLockNotAcquired()


def is_ingest_locked(db: Session):
"""
https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
"""

result: tuple[bool] = db.execute(
f"SELECT pg_try_advisory_lock({INGEST_LOCK_ID}); "
f"SELECT pg_advisory_unlock({INGEST_LOCK_ID});"
).first()
return not result[0]


def clear_tables(db: Session):
db.execute(
"""
/*
A convenience function to clear all tables that get repopulated
during an ingest. Any tables storing persistent data should be
added as an exception here.
*/
CREATE OR REPLACE FUNCTION clear_tables() RETURNS void AS $$
DECLARE
statements CURSOR FOR
SELECT tablename FROM pg_tables
WHERE schemaname = 'public'
and tablename <> 'alembic_version'
and tablename <> 'file_download'
and tablename <> 'bulk_download'
and tablename <> 'bulk_download_data_object'
and tablename <> 'user_logins'
and tablename <> 'submission_metadata';
BEGIN
FOR stmt IN statements LOOP
EXECUTE 'DELETE FROM ' || quote_ident(stmt.tablename) || ';';
END LOOP;
END;
$$ LANGUAGE plpgsql;
SELECT clear_tables();
"""
).all()
Loading