diff --git a/alembic/versions/1da92a1c740f_create_v2_tables.py b/alembic/versions/1da92a1c740f_create_v2_tables.py new file mode 100644 index 000000000..127ee5604 --- /dev/null +++ b/alembic/versions/1da92a1c740f_create_v2_tables.py @@ -0,0 +1,247 @@ +"""create v2 tables + +Revision ID: 1da92a1c740f +Revises: acf951c80750 +Create Date: 2025-06-13 14:56:31.238050+00:00 + +""" + +from collections.abc import Sequence +from enum import Enum +from uuid import NAMESPACE_DNS + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "1da92a1c740f" +down_revision: str | None = "acf951c80750" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +DEFAULT_CAMPAIGN_NAMESPACE = "dda54a0c-6878-5c95-ac4f-007f6808049e" +"""UUID5 of name 'io.lsst.cmservice' in `uuid.NAMESPACE_DNS`.""" + +# DB model uses mapped columns with Python Enum types, but we do not care +# to use native enums in the database, so when we have such a column, this +# definition will produce a VARCHAR instead. +ENUM_COLUMN_AS_VARCHAR = sa.Enum(Enum, length=20, native_enum=False, check_constraint=False) + + +def upgrade() -> None: + # Create table for machines v2 + machines_v2 = op.create_table( + "machines_v2", + sa.Column("id", postgresql.UUID(), nullable=False), + sa.Column("state", sa.PickleType, nullable=False), + sa.PrimaryKeyConstraint("id"), + if_not_exists=True, + ) + + # Create table for campaigns v2 + campaigns_v2 = op.create_table( + "campaigns_v2", + sa.Column("id", postgresql.UUID(), nullable=False), + sa.Column("name", postgresql.VARCHAR(), nullable=False), + sa.Column("namespace", postgresql.UUID(), nullable=False, default=DEFAULT_CAMPAIGN_NAMESPACE), + sa.Column("owner", postgresql.VARCHAR(), nullable=True), + sa.Column( + "metadata", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.Column( + "configuration", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.Column("status", ENUM_COLUMN_AS_VARCHAR, nullable=False, default="waiting"), + sa.Column( + "machine", postgresql.UUID(), sa.ForeignKey(machines_v2.c.id, ondelete="CASCADE"), nullable=True + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("name", "namespace"), + if_not_exists=True, + ) + + # Create node and edges tables for campaign digraph + nodes_v2 = op.create_table( + "nodes_v2", + sa.Column("id", postgresql.UUID(), nullable=False), + sa.Column( + "namespace", + postgresql.UUID(), + sa.ForeignKey(campaigns_v2.c.id, ondelete="CASCADE"), + nullable=False, + ), + sa.Column("name", postgresql.VARCHAR(), nullable=False), + sa.Column("version", postgresql.INTEGER(), nullable=False, default=1), + sa.Column("kind", ENUM_COLUMN_AS_VARCHAR, nullable=False, default="node"), + sa.Column( + "metadata", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.Column( + "configuration", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.Column("status", ENUM_COLUMN_AS_VARCHAR, nullable=False, default="waiting"), + sa.Column( + "machine", postgresql.UUID(), sa.ForeignKey(machines_v2.c.id, ondelete="CASCADE"), nullable=True + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("name", "version", "namespace"), + if_not_exists=True, + ) + + _ = op.create_table( + "edges_v2", + sa.Column("id", postgresql.UUID(), nullable=False), + sa.Column("name", postgresql.VARCHAR(), nullable=False), + sa.Column( + "namespace", + postgresql.UUID(), + sa.ForeignKey(campaigns_v2.c.id, ondelete="CASCADE"), + nullable=False, + ), + sa.Column("source", postgresql.UUID(), sa.ForeignKey(nodes_v2.c.id), nullable=False), + sa.Column("target", postgresql.UUID(), sa.ForeignKey(nodes_v2.c.id), nullable=False), + sa.Column( + "metadata", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.Column( + "configuration", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("source", "target", "namespace"), + if_not_exists=True, + ) + + # Create table for spec blocks v2 ("manifests") + _ = op.create_table( + "manifests_v2", + sa.Column("id", postgresql.UUID(), nullable=False), + sa.Column("name", postgresql.VARCHAR(), nullable=False), + sa.Column( + "namespace", + postgresql.UUID(), + sa.ForeignKey(campaigns_v2.c.id, ondelete="CASCADE"), + nullable=False, + ), + sa.Column("version", postgresql.INTEGER(), nullable=False, default=1), + sa.Column("kind", ENUM_COLUMN_AS_VARCHAR, nullable=False, default="other"), + sa.Column( + "metadata", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.Column( + "spec", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("name", "version", "namespace"), + if_not_exists=True, + ) + + # Create table for tasks v2 + _ = op.create_table( + "tasks_v2", + sa.Column("id", postgresql.UUID(), nullable=False), + sa.Column("namespace", postgresql.UUID(), nullable=False), + sa.Column("node", postgresql.UUID(), nullable=False), + sa.Column("priority", postgresql.INTEGER(), nullable=True), + sa.Column("created_at", postgresql.TIMESTAMP(timezone=True), nullable=False), + sa.Column("last_processed_at", postgresql.TIMESTAMP(timezone=True), nullable=True), + sa.Column("finished_at", postgresql.TIMESTAMP(timezone=True), nullable=True), + sa.Column("wms_id", postgresql.VARCHAR(), nullable=True), + sa.Column("site_affinity", postgresql.ARRAY(postgresql.VARCHAR()), nullable=True), + sa.Column("status", ENUM_COLUMN_AS_VARCHAR, nullable=False), + sa.Column("previous_status", ENUM_COLUMN_AS_VARCHAR, nullable=True), + sa.Column( + "metadata", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint(["node"], ["nodes_v2.id"]), + sa.ForeignKeyConstraint(["namespace"], ["campaigns_v2.id"]), + if_not_exists=True, + ) + + _ = op.create_table( + "activity_log_v2", + sa.Column("id", postgresql.UUID(), nullable=False), + sa.Column("namespace", postgresql.UUID(), nullable=False), + sa.Column("node", postgresql.UUID(), sa.ForeignKey(nodes_v2.c.id), nullable=False), + sa.Column("operator", postgresql.VARCHAR(), nullable=False, default="root"), + sa.Column("from_status", ENUM_COLUMN_AS_VARCHAR, nullable=False), + sa.Column("to_status", ENUM_COLUMN_AS_VARCHAR, nullable=False), + sa.Column( + "detail", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + sa.Column( + "metadata", + postgresql.JSONB(), + nullable=False, + default=dict, + server_default=sa.text("'{}'::json"), + ), + if_not_exists=True, + ) + + # Insert default campaign (namespace) record + op.bulk_insert( + campaigns_v2, + [ + { + "id": DEFAULT_CAMPAIGN_NAMESPACE, + "namespace": str(NAMESPACE_DNS), + "name": "DEFAULT", + "owner": "root", + } + ], + ) + + +def downgrade() -> None: + """Drop tables in the reverse order in which they were created.""" + op.drop_table("activity_log_v2", if_exists=True) + op.drop_table("tasks_v2", if_exists=True) + op.drop_table("manifests_v2", if_exists=True) + op.drop_table("edges_v2", if_exists=True) + op.drop_table("nodes_v2", if_exists=True) + op.drop_table("campaigns_v2", if_exists=True) + op.drop_table("machines_v2", if_exists=True) diff --git a/src/lsst/cmservice/common/enums.py b/src/lsst/cmservice/common/enums.py index 6b362e779..1379fb73b 100644 --- a/src/lsst/cmservice/common/enums.py +++ b/src/lsst/cmservice/common/enums.py @@ -286,3 +286,20 @@ class WmsComputeSite(enum.Enum): lanc = 2 ral = 3 in2p3 = 4 + + +class ManifestKind(enum.Enum): + """Define a manifest kind""" + + campaign = enum.auto() + node = enum.auto() + edge = enum.auto() + # Legacy kinds + specification = enum.auto() + spec_block = enum.auto() + step = enum.auto() + group = enum.auto() + job = enum.auto() + script = enum.auto() + # Fallback kind + other = enum.auto() diff --git a/src/lsst/cmservice/db/campaigns_v2.py b/src/lsst/cmservice/db/campaigns_v2.py new file mode 100644 index 000000000..aa7802fcc --- /dev/null +++ b/src/lsst/cmservice/db/campaigns_v2.py @@ -0,0 +1,279 @@ +from datetime import datetime +from typing import Annotated, Any +from uuid import NAMESPACE_DNS, UUID, uuid5 + +from pydantic import AliasChoices, PlainSerializer, PlainValidator, ValidationInfo, model_validator +from sqlalchemy.dialects import postgresql +from sqlalchemy.ext.mutable import MutableDict, MutableList +from sqlalchemy.types import PickleType +from sqlmodel import Column, Enum, Field, SQLModel, String + +from ..common.enums import ManifestKind, StatusEnum +from ..config import config + +_default_campaign_namespace = uuid5(namespace=NAMESPACE_DNS, name="io.lsst.cmservice") +"""Default UUID5 namespace for campaigns""" + + +def jsonb_column(name: str, aliases: list[str] | None = None) -> Any: + """Constructor for a Field based on a JSONB database column. + + If provided, a list of aliases will be used to construct a pydantic + ``AliasChoices`` object for the field's validation alias, which improves + usability by making model validation more flexible (e.g., having "metadata" + and "metadata_" refer to the same field). + + Additionally, the first alias in the list will be used for the model's + serialization alias. + """ + schema_extra = {} + if aliases: + schema_extra = { + "validation_alias": AliasChoices(*aliases), + "serialization_alias": aliases[0], + } + return Field( + sa_column=Column(name, MutableDict.as_mutable(postgresql.JSONB)), + default_factory=dict, + schema_extra=schema_extra, + ) + + +# NOTES +# - model validation is not triggered when table=True +# - Every object model needs to have three flavors: +# 1. the declarative model of the object's database table +# 2. the model of the manifest when creating a new object +# 3. the model of the manifest when updating an object +# 4. a response model for APIs related to the object + +EnumSerializer = PlainSerializer( + lambda x: x.name, + return_type="str", + when_used="always", +) +"""A serializer for enums that produces its name, not the value.""" + + +StatusEnumValidator = PlainValidator(lambda x: StatusEnum[x] if isinstance(x, str) else StatusEnum(x)) +"""A validator for the StatusEnum that can parse the enum from either a name +or a value. +""" + + +ManifestKindEnumValidator = PlainValidator( + lambda x: ManifestKind[x] if isinstance(x, str) else ManifestKind(x) +) +"""A validator for the ManifestKindEnum that can parse the enum from a name +or a value. +""" + + +class BaseSQLModel(SQLModel): + __table_args__ = {"schema": config.db.table_schema} + + +class CampaignBase(BaseSQLModel): + """Campaigns_v2 base model, used to create new Campaign objects.""" + + id: UUID = Field(primary_key=True) + name: str + namespace: UUID + owner: str | None = Field(default=None) + status: Annotated[StatusEnum, StatusEnumValidator, EnumSerializer] | None = Field( + default=StatusEnum.waiting, + sa_column=Column("status", Enum(StatusEnum, length=20, native_enum=False, create_constraint=False)), + ) + metadata_: dict = jsonb_column("metadata", aliases=["metadata", "metadata_"]) + configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"]) + + +class CampaignModel(CampaignBase): + """model used for resource creation.""" + + @model_validator(mode="before") + @classmethod + def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: + """Validates the model based on different types of raw inputs, + where some default non-optional fields can be auto-populated. + """ + if isinstance(data, dict): + if "name" not in data: + raise ValueError("'name' must be specified.") + if "namespace" not in data: + data["namespace"] = _default_campaign_namespace + if "id" not in data: + data["id"] = uuid5(namespace=data["namespace"], name=data["name"]) + return data + + +class Campaign(CampaignModel, table=True): + """Model used for database operations involving campaigns_v2 table rows""" + + __tablename__: str = "campaigns_v2" # type: ignore[misc] + + machine: UUID | None + + +class NodeBase(BaseSQLModel): + """nodes_v2 db table""" + + id: UUID = Field(primary_key=True) + name: str + namespace: UUID + version: int + kind: Annotated[ManifestKind, ManifestKindEnumValidator, EnumSerializer] = Field( + default=ManifestKind.other, + sa_column=Column("kind", Enum(ManifestKind, length=20, native_enum=False, create_constraint=False)), + ) + status: Annotated[StatusEnum, StatusEnumValidator, EnumSerializer] | None = Field( + default=StatusEnum.waiting, + sa_column=Column("status", Enum(StatusEnum, length=20, native_enum=False, create_constraint=False)), + ) + metadata_: dict = jsonb_column("metadata", aliases=["metadata", "metadata_"]) + configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"]) + + +class NodeModel(NodeBase): + """model validating class for Nodes""" + + @model_validator(mode="before") + @classmethod + def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: + if isinstance(data, dict): + if "version" not in data: + data["version"] = 1 + if "name" not in data: + raise ValueError("'name' must be specified.") + if "namespace" not in data: + data["namespace"] = _default_campaign_namespace + if "id" not in data: + data["id"] = uuid5(namespace=data["namespace"], name=f"""{data["name"]}.{data["version"]}""") + return data + + +class Node(NodeModel, table=True): + __tablename__: str = "nodes_v2" # type: ignore[misc] + + machine: UUID | None + + +class EdgeBase(BaseSQLModel): + """edges_v2 db table""" + + id: UUID = Field(primary_key=True) + name: str + namespace: UUID = Field(foreign_key="campaigns_v2.id") + source: UUID = Field(foreign_key="nodes_v2.id") + target: UUID = Field(foreign_key="nodes_v2.id") + metadata_: dict = jsonb_column("metadata", aliases=["metadata", "metadata_"]) + configuration: dict = jsonb_column("configuration", aliases=["configuration", "data", "spec"]) + + +class EdgeModel(EdgeBase): + """model validating class for Edges""" + + @model_validator(mode="before") + @classmethod + def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: + if isinstance(data, dict): + if "name" not in data: + raise ValueError("'name' must be specified.") + if "namespace" not in data: + raise ValueError("Edges may only exist in a 'namespace'.") + if "id" not in data: + data["id"] = uuid5(namespace=data["namespace"], name=data["name"]) + return data + + +class EdgeResponseModel(EdgeModel): + source: Any + target: Any + + +class Edge(EdgeModel, table=True): + __tablename__: str = "edges_v2" # type: ignore[misc] + + +class MachineBase(BaseSQLModel): + """machines_v2 db table.""" + + id: UUID = Field(primary_key=True) + state: Any | None = Field(sa_column=Column("state", PickleType)) + + +class ManifestBase(BaseSQLModel): + """manifests_v2 db table""" + + id: UUID = Field(primary_key=True) + name: str + version: int + namespace: UUID + kind: Annotated[ManifestKind, EnumSerializer] = Field( + default=ManifestKind.other, + sa_column=Column("kind", Enum(ManifestKind, length=20, native_enum=False, create_constraint=False)), + ) + metadata_: dict = jsonb_column("metadata", aliases=["metadata", "metadata_"]) + spec: dict = jsonb_column("spec", aliases=["spec", "configuration", "data"]) + + +class ManifestModel(ManifestBase): + """model validating class for Manifests""" + + @model_validator(mode="before") + @classmethod + def custom_model_validator(cls, data: Any, info: ValidationInfo) -> Any: + if isinstance(data, dict): + if "version" not in data: + data["version"] = 1 + if "name" not in data: + raise ValueError("'name' must be specified.") + if "namespace" not in data: + data["namespace"] = _default_campaign_namespace + if "id" not in data: + data["id"] = uuid5(namespace=data["namespace"], name=f"""{data["name"]}.{data["version"]}""") + return data + + +class Manifest(ManifestBase, table=True): + __tablename__: str = "manifests_v2" # type: ignore[misc] + + +class Task(SQLModel, table=True): + """tasks_v2 db table""" + + __tablename__: str = "tasks_v2" # type: ignore[misc] + + id: UUID = Field(primary_key=True) + namespace: UUID + node: UUID + priority: int + created_at: datetime + last_processed_at: datetime + finished_at: datetime + wms_id: str + site_affinity: list[str] = Field( + sa_column=Column("site_affinity", MutableList.as_mutable(postgresql.ARRAY(String()))) + ) + status: Annotated[StatusEnum, StatusEnumValidator, EnumSerializer] = Field( + sa_column=Column("status", Enum(StatusEnum, length=20, native_enum=False, create_constraint=False)), + ) + previous_status: Annotated[StatusEnum, StatusEnumValidator, EnumSerializer] = Field( + sa_column=Column( + "previous_status", Enum(StatusEnum, length=20, native_enum=False, create_constraint=False) + ), + ) + + +class ActivityLogBase(BaseSQLModel): + id: UUID = Field(primary_key=True) + namespace: UUID + node: UUID + operator: str + from_status: Annotated[StatusEnum, EnumSerializer] + to_status: Annotated[StatusEnum, EnumSerializer] + detail: dict = jsonb_column("detail") + + +class ActivityLog(ActivityLogBase, table=True): + __tablename__: str = "activity_log_v2" # type: ignore[misc] diff --git a/src/lsst/cmservice/db/manifests_v2.py b/src/lsst/cmservice/db/manifests_v2.py new file mode 100644 index 000000000..90cef6d2d --- /dev/null +++ b/src/lsst/cmservice/db/manifests_v2.py @@ -0,0 +1,34 @@ +from typing import Annotated + +from pydantic import AliasChoices +from sqlmodel import Field, SQLModel + +from ..common.enums import ManifestKind +from .campaigns_v2 import EnumSerializer, ManifestKindEnumValidator + + +# this can probably be a BaseModel since this is not a db relation, but the +# distinction probably doesn't matter +class ManifestWrapper(SQLModel): + """a model for an object's Manifest wrapper, used by APIs where the `spec` + should be the kind's table model, more or less. + """ + + apiversion: str = Field(default="io.lsst.cmservice/v1") + kind: Annotated[ManifestKind, ManifestKindEnumValidator, EnumSerializer] = Field( + default=ManifestKind.other, + ) + metadata_: dict = Field( + default_factory=dict, + schema_extra={ + "validation_alias": AliasChoices("metadata", "metadata_"), + "serialization_alias": "metadata", + }, + ) + spec: dict = Field( + default_factory=dict, + schema_extra={ + "validation_alias": AliasChoices("spec", "configuration", "data"), + "serialization_alias": "spec", + }, + ) diff --git a/src/lsst/cmservice/db/session.py b/src/lsst/cmservice/db/session.py index 294fe755d..3824fb5bc 100644 --- a/src/lsst/cmservice/db/session.py +++ b/src/lsst/cmservice/db/session.py @@ -3,7 +3,7 @@ from collections.abc import AsyncGenerator # from pydantic import SecretStr #noqa: ERA001 -from sqlalchemy import make_url +from sqlalchemy import URL, make_url from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine from sqlmodel.ext.asyncio.session import AsyncSession @@ -18,45 +18,40 @@ class DatabaseSessionDependency: async session is yielded. """ + engine: AsyncEngine | None + sessionmaker: async_sessionmaker[AsyncSession] | None + url: URL + def __init__(self) -> None: - self.engine: AsyncEngine | None = None - self.sessionmaker: async_sessionmaker[AsyncSession] | None = None + self.engine = None + self.sessionmaker = None async def initialize( self, *, - isolation_level: str | None = None, use_async: bool = True, - echo: bool = False, ) -> None: """Initialize the session dependency. Parameters ---------- - url - Database connection URL, not including the password. - password - Database connection password. - isolation_level - If specified, sets a non-default isolation level for the database - engine. use_async If true (default), the database drivername will be forced to an async form. """ if isinstance(config.db.url, str): - url = make_url(config.db.url) - if use_async and url.drivername == "postgresql": - url = url.set(drivername="postgresql+asyncpg") + self.url = make_url(config.db.url) + if use_async and self.url.drivername == "postgresql": + self.url = self.url.set(drivername="postgresql+asyncpg") # FIXME use SecretStr for password # if isinstance(config.db.password, SecretStr): # password = config.db.password.get_secret_value() #noqa: ERA001 if config.db.password is not None: - url = url.set(password=config.db.password) + self.url = self.url.set(password=config.db.password) if self.engine: await self.engine.dispose() self.engine = create_async_engine( - url=url, + url=self.url, echo=config.db.echo, # TODO add pool-level configs )