Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cli/macrostrat/cli/database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from pathlib import Path
from sys import exit, stderr, stdin, stdout
from typing import Any, Callable, Iterable

Expand All @@ -10,6 +11,7 @@

from macrostrat.core import MacrostratSubsystem, app
from macrostrat.core.migrations import run_migrations
from macrostrat.database import Database
from macrostrat.database.transfer import pg_dump_to_file, pg_restore_from_file
from macrostrat.database.transfer.utils import raw_database_url
from macrostrat.database.utils import get_sql_files
Expand All @@ -20,7 +22,7 @@

# First, register all migrations
# NOTE: right now, this is quite implicit.
from .migrations import *
from .migrations import load_migrations
from .utils import engine_for_db_name

log = get_logger(__name__)
Expand All @@ -32,6 +34,9 @@
DBCallable = Callable[[Database], None]


load_migrations()


class SubsystemSchemaDefinition(BaseModel):
"""A schema definition managed by a Macrostrat subsystem"""

Expand Down
69 changes: 8 additions & 61 deletions cli/macrostrat/cli/database/migrations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,14 @@
from importlib import import_module
from pathlib import Path

from macrostrat.core.migrations import ApplicationStatus, Migration
from macrostrat.database import Database

from . import (
api_v3,
baseline,
column_builder,
macrostrat_mariadb,
map_source_slugs,
map_sources,
maps_scale_custom_type,
maps_source_operations,
partition_carto,
partition_maps,
points,
tileserver,
update_macrostrat,
)

__dir__ = Path(__file__).parent

# Import all submodules within this directory using importlib

class StorageSchemeMigration(Migration):
name = "storage-scheme"

depends_on = ["api-v3"]

def apply(self, db: Database):
db.run_sql(
"""
CREATE TYPE storage.scheme AS ENUM ('s3', 'https', 'http');
ALTER TYPE storage.scheme ADD VALUE 'https' AFTER 's3';
ALTER TYPE storage.scheme ADD VALUE 'http' AFTER 'https';

-- Lock the table to prevent concurrent updates
LOCK TABLE storage.object IN ACCESS EXCLUSIVE MODE;

ALTER TABLE storage.object
ALTER COLUMN scheme
TYPE storage.scheme USING scheme::text::storage.scheme;

-- Unlock the table
COMMIT;

DROP TYPE IF EXISTS macrostrat.schemeenum;
"""
)

def should_apply(self, db: Database):
if has_enum(db, "schemeenum", schema="macrostrat"):
return ApplicationStatus.CAN_APPLY
else:
return ApplicationStatus.APPLIED


def has_enum(db: Database, name: str, schema: str = None):
sql = "select 1 from pg_type where typname = :name"
if schema is not None:
sql += (
" and typnamespace = (select oid from pg_namespace where nspname = :schema)"
)

return db.run_query(
f"select exists ({sql})", dict(name=name, schema=schema)
).scalar()
def load_migrations():
for module in __dir__.iterdir():
if module.is_file() and module.suffix == ".py" and module.stem != "__init__":
import_module(f".{module.stem}", package=__name__)
elif module.is_dir() and (module / "__init__.py").exists():
import_module(f".{module.stem}", package=__name__)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
ALTER TABLE maps.sources ADD COLUMN IF NOT EXISTS raster_url text;

DROP VIEW IF EXISTS macrostrat_api.sources_metadata CASCADE;
DROP VIEW IF EXISTS maps.sources_metadata CASCADE;

CREATE OR REPLACE VIEW maps.sources_metadata AS
SELECT
s.source_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from macrostrat.core.migrations import Migration, _not, custom_type_exists
from macrostrat.database import Database


class MapsIngestStateCustomTypeMigration(Migration):
name = "ingest-state-type"
subsystem = "maps"
description = """
- Relocate custom types that drives the map ingestion process.
- Remove duplicate custom types from the public schema.
"""

depends_on = ["baseline", "macrostrat-mariadb"]

postconditions = [
custom_type_exists("maps", "ingest_state"),
custom_type_exists("maps", "ingest_type"),
_not(custom_type_exists("public", "ingest_state")),
_not(custom_type_exists("public", "ingest_type")),
]

preconditions = []

def apply(self, db: Database):
# Handle edge case where the MariaDB migration has already been applied
db.run_sql("ALTER TYPE macrostrat_backup.ingest_state SET SCHEMA macrostrat")
db.run_sql("ALTER TYPE macrostrat.ingest_state SET SCHEMA maps")

db.run_sql("ALTER TYPE macrostrat_backup.ingest_type SET SCHEMA macrostrat")
db.run_sql("ALTER TYPE macrostrat.ingest_type SET SCHEMA maps")

db.run_sql("DROP TYPE IF EXISTS public.ingest_state")
db.run_sql("DROP TYPE IF EXISTS public.ingest_type")
47 changes: 47 additions & 0 deletions cli/macrostrat/cli/database/migrations/storage_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from macrostrat.core.migrations import ApplicationStatus, Migration
from macrostrat.database import Database


class StorageSchemeMigration(Migration):
name = "storage-scheme"

depends_on = ["api-v3"]

def apply(self, db: Database):
db.run_sql(
"""
CREATE TYPE storage.scheme AS ENUM ('s3', 'https', 'http');
ALTER TYPE storage.scheme ADD VALUE 'https' AFTER 's3';
ALTER TYPE storage.scheme ADD VALUE 'http' AFTER 'https';

-- Lock the table to prevent concurrent updates
LOCK TABLE storage.object IN ACCESS EXCLUSIVE MODE;

ALTER TABLE storage.object
ALTER COLUMN scheme
TYPE storage.scheme USING scheme::text::storage.scheme;

-- Unlock the table
COMMIT;

DROP TYPE IF EXISTS macrostrat.schemeenum;
"""
)

def should_apply(self, db: Database):
if has_enum(db, "schemeenum", schema="macrostrat"):
return ApplicationStatus.CAN_APPLY
else:
return ApplicationStatus.APPLIED


def has_enum(db: Database, name: str, schema: str = None):
sql = "select 1 from pg_type where typname = :name"
if schema is not None:
sql += (
" and typnamespace = (select oid from pg_namespace where nspname = :schema)"
)

return db.run_query(
f"select exists ({sql})", dict(name=name, schema=schema)
).scalar()
1 change: 1 addition & 0 deletions cli/macrostrat/cli/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
add_completion=True,
rich_markup_mode="rich",
help=help_text,
backend=app.settings.backend,
)

main.add_typer(
Expand Down
10 changes: 10 additions & 0 deletions cli/macrostrat/cli/subsystems/macrostrat_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,13 @@ def on_schema_update(self):
# fully reloaded the schema.
if self.app.settings.get("compose_root", None) is not None:
compose("kill -s SIGUSR1 postgrest")


def check_view_is_changed(db, schema, view_name, new_statement):
pass


def get_view_definition(db, schema, view_name):
"""Get the definition of a view in a schema"""
_sql = "SELECT view_definition FROM information_schema.views WHERE table_schema = :schema AND table_name = :view_name"
return db.run_query(_sql, dict(schema=schema, view_name=view_name)).scalar()
107 changes: 81 additions & 26 deletions core/macrostrat/core/migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ def exists(schema: str, *table_names: str) -> DbEvaluator:

def not_exists(schema: str, *table_names: str) -> DbEvaluator:
"""Return a function that evaluates to true when every given table in the given schema doesn't exist"""
return lambda db: all(
not db.inspector.has_table(t, schema=schema) for t in table_names
)
return _not(exists(schema, *table_names))


def schema_exists(schema: str) -> DbEvaluator:
Expand All @@ -50,6 +48,11 @@ def custom_type_exists(schema: str, *type_names: str) -> DbEvaluator:
return lambda db: all(db.inspector.has_type(t, schema=schema) for t in type_names)


def _not(f: DbEvaluator) -> DbEvaluator:
"""Return a function that evaluates to true when the given function evaluates to false"""
return lambda db: not f(db)


class ApplicationStatus(Enum):
"""Enum for the possible"""

Expand Down Expand Up @@ -109,6 +112,16 @@ def apply(self, database: Database):
database.run_fixtures(child_cls_dir)


class MigrationState(Enum):
"""Enum for the possible states of a migration before application"""

COMPLETE = "complete"
UNMET_DEPENDENCIES = "unmet_dependencies"
CANNOT_APPLY = "cannot_apply"
SHOULD_APPLY = "should_apply"
DISALLOWED = "disallowed"


def run_migrations(
apply: bool = False,
name: str = None,
Expand Down Expand Up @@ -136,6 +149,11 @@ def run_migrations(
# While iterating over migrations, keep track of which have already applied
completed_migrations = []

# Get max width of migration names for formatting
name_max_width = max(len(m.name) for m in instances)

print("Migrations:")

for _migration in instances:
_name = _migration.name
_subsystem = getattr(_migration, "subsystem", None)
Expand All @@ -153,39 +171,34 @@ def run_migrations(
if subsystem is not None and subsystem != _subsystem:
continue

_status = _get_status(_migration, completed_migrations)

_print_status(_name, _status, name_max_width=name_max_width)

# By default, don't run migrations that depend on other non-applied migrations
dependencies_met = all(d in completed_migrations for d in _migration.depends_on)
if not dependencies_met and not force:
print(f"Dependencies not met for migration [cyan]{_name}[/cyan]")
continue

if force or apply_status == ApplicationStatus.CAN_APPLY:
if not apply:
print(f"Would apply migration [cyan]{_name}[/cyan]")
else:
if _migration.destructive and not data_changes and not force:
print(
f"Migration [cyan]{_name}[/cyan] would alter data in the database. Run with --force or --data-changes"
)
return

print(f"Applying migration [cyan]{_name}[/cyan]")
_migration.apply(db)
# After running migration, reload the database and confirm that application was sucessful
db = refresh_database()
if _migration.should_apply(db) == ApplicationStatus.APPLIED:
completed_migrations.append(_migration.name)
elif apply_status == ApplicationStatus.APPLIED:
print(f"Migration [cyan]{_name}[/cyan] already applied")
else:
print(f"Migration [cyan]{_name}[/cyan] cannot apply")
if (force or apply_status == ApplicationStatus.CAN_APPLY) and apply:
if _migration.destructive and not data_changes and not force:
return

_migration.apply(db)
# After running migration, reload the database and confirm that application was sucessful
db = refresh_database()
if _migration.should_apply(db) == ApplicationStatus.APPLIED:
completed_migrations.append(_migration.name)

# Short circuit after applying the migration specified by --name
if name is not None and name == _name:
break

# Notify PostgREST to reload the schema cache
db.run_sql("NOTIFY pgrst, 'reload schema';")
if apply:
# Notify PostgREST to reload the schema cache
db.run_sql("NOTIFY pgrst, 'reload schema';")
else:
print("\n[dim]To apply the migrations, run with --apply")


def migration_has_been_run(*names: str):
Expand All @@ -202,3 +215,45 @@ def migration_has_been_run(*names: str):
if apply_status != ApplicationStatus.APPLIED:
return True
return False


def _get_status(
_migration: Migration, completed_migrations: set[str]
) -> MigrationState:
"""Get the status of a migration"""
name = _migration.name

# By default, don't run migrations that depend on other non-applied migrations
dependencies_met = all(d in completed_migrations for d in _migration.depends_on)
if not dependencies_met and not force:
return MigrationState.UNMET_DEPENDENCIES

if name in completed_migrations:
return MigrationState.COMPLETE

if force or apply_status == ApplicationStatus.CAN_APPLY:
if not apply:
return MigrationState.SHOULD_APPLY
else:
if _migration.destructive and not data_changes and not force:
return MigrationState.DISALLOWED
return MigrationState.SHOULD_APPLY

return MigrationState.CANNOT_APPLY


def _print_status(name, status: MigrationState, *, name_max_width=40):
padding = " " * (name_max_width - len(name))
print(f"- [bold cyan]{name}[/]: " + padding, end="")
if status == MigrationState.COMPLETE:
print("[green]already applied[/green]")
elif status == MigrationState.UNMET_DEPENDENCIES:
print("[yellow]has unmet dependencies[/yellow]")
elif status == MigrationState.CANNOT_APPLY:
print("[red]cannot be applied[/red]")
elif status == MigrationState.SHOULD_APPLY:
print("[yellow]should be applied[/yellow]")
elif status == MigrationState.DISALLOWED:
print("[red]cannot be applied without --force or --data-changes[/red]")
else:
raise ValueError(f"Unknown migration status: {status}")