diff --git a/pyproject.toml b/pyproject.toml index 635103017..15d469a73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,6 @@ dependencies = [ "pydantic >=2, <3", "pydantic-settings >=2, <3", "pyyaml", - "typer", ] classifiers = [ diff --git a/tiled/commandline/_admin.py b/tiled/commandline/_admin.py index ce6f00421..054dd1107 100644 --- a/tiled/commandline/_admin.py +++ b/tiled/commandline/_admin.py @@ -1,193 +1,15 @@ -from pathlib import Path -from typing import Optional +from pydantic import BaseModel +from pydantic_settings import CliApp, CliSubCommand -import typer +from tiled.commandline._api_key import APIKeys +from tiled.commandline._database import Database +from tiled.commandline._principal import Principals -from ._utils import get_context, get_profile # noqa E402 -admin_app = typer.Typer() +class Admin(BaseModel): + database: CliSubCommand[Database] + api_keys: CliSubCommand[APIKeys] + principals: CliSubCommand[Principals] - -@admin_app.command("initialize-database") -def initialize_database(database_uri: str): - """ - Initialize a SQL database for use by Tiled. - """ - import asyncio - - from sqlalchemy.ext.asyncio import create_async_engine - - from ..alembic_utils import UninitializedDatabase, check_database, stamp_head - from ..authn_database.alembic_constants import ( - ALEMBIC_DIR, - ALEMBIC_INI_TEMPLATE_PATH, - ) - from ..authn_database.core import ( - ALL_REVISIONS, - REQUIRED_REVISION, - initialize_database, - ) - from ..utils import ensure_specified_sql_driver - - database_uri = ensure_specified_sql_driver(database_uri) - - async def do_setup(): - engine = create_async_engine(database_uri) - redacted_url = engine.url._replace(password="[redacted]") - try: - await check_database(engine, REQUIRED_REVISION, ALL_REVISIONS) - except UninitializedDatabase: - # Create tables and stamp (alembic) revision. - typer.echo( - f"Database {redacted_url} is new. Creating tables and marking revision {REQUIRED_REVISION}.", - err=True, - ) - await initialize_database(engine) - typer.echo("Database initialized.", err=True) - else: - typer.echo(f"Database at {redacted_url} is already initialized.", err=True) - raise typer.Abort() - await engine.dispose() - - asyncio.run(do_setup()) - stamp_head(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri) - - -@admin_app.command("upgrade-database") -def upgrade_database( - database_uri: str, - revision: Optional[str] = typer.Argument( - None, - help="The ID of a revision to upgrade to. By default, upgrade to the latest one.", - ), -): - """ - Upgrade the database schema to the latest version. - """ - import asyncio - - from sqlalchemy.ext.asyncio import create_async_engine - - from ..alembic_utils import get_current_revision, upgrade - from ..authn_database.alembic_constants import ( - ALEMBIC_DIR, - ALEMBIC_INI_TEMPLATE_PATH, - ) - from ..authn_database.core import ALL_REVISIONS - from ..utils import ensure_specified_sql_driver - - database_uri = ensure_specified_sql_driver(database_uri) - - async def do_setup(): - engine = create_async_engine(database_uri) - redacted_url = engine.url._replace(password="[redacted]") - current_revision = await get_current_revision(engine, ALL_REVISIONS) - await engine.dispose() - if current_revision is None: - # Create tables and stamp (alembic) revision. - typer.echo( - f"Database {redacted_url} has not been initialized. Use `tiled admin initialize-database`.", - err=True, - ) - raise typer.Abort() - - asyncio.run(do_setup()) - upgrade(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri, revision or "head") - - -@admin_app.command("downgrade-database") -def downgrade_database( - database_uri: str, - revision: str = typer.Argument(..., help="The ID of a revision to downgrade to."), -): - """ - Upgrade the database schema to the latest version. - """ - import asyncio - - from sqlalchemy.ext.asyncio import create_async_engine - - from ..alembic_utils import downgrade, get_current_revision - from ..authn_database.alembic_constants import ( - ALEMBIC_DIR, - ALEMBIC_INI_TEMPLATE_PATH, - ) - from ..authn_database.core import ALL_REVISIONS - from ..utils import ensure_specified_sql_driver - - database_uri = ensure_specified_sql_driver(database_uri) - - async def do_setup(): - engine = create_async_engine(database_uri) - redacted_url = engine.url._replace(password="[redacted]") - current_revision = await get_current_revision(engine, ALL_REVISIONS) - if current_revision is None: - # Create tables and stamp (alembic) revision. - typer.echo( - f"Database {redacted_url} has not been initialized. Use `tiled admin initialize-database`.", - err=True, - ) - raise typer.Abort() - - asyncio.run(do_setup()) - downgrade(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri, revision) - - -@admin_app.command("check-config") -def check_config( - config_path: Path = typer.Argument( - None, - help=( - "Path to a config file or directory of config files. " - "If None, check environment variable TILED_CONFIG. " - "If that is unset, try default location ./config.yml." - ), - ), -): - "Check configuration file for syntax and validation errors." - import os - - from ..config import parse_configs - - config_path = config_path or os.getenv("TILED_CONFIG", "config.yml") - try: - parse_configs(config_path) - except Exception as err: - typer.echo(str(err), err=True) - raise typer.Exit(1) - typer.echo("No errors found in configuration.") - - -@admin_app.command("list-principals") -def list_principals( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), - page_offset: int = typer.Argument(0), - page_limit: int = typer.Argument(100, help="Max items to show"), -): - """ - List information about all Principals (users or services) that have ever logged in. - """ - import json - - context = get_context(profile) - result = context.admin.list_principals(offset=page_offset, limit=page_limit) - typer.echo(json.dumps(result, indent=2)) - - -@admin_app.command("show-principal") -def show_principal( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), - uuid: str = typer.Argument(..., help="UUID identifying Principal of interest"), -): - """ - Show information about one Principal (user or service). - """ - import json - - context = get_context(profile) - result = context.admin.show_principal(uuid) - typer.echo(json.dumps(result, indent=2)) + def cli_cmd(self) -> None: + CliApp.run_subcommand(self) diff --git a/tiled/commandline/_api_key.py b/tiled/commandline/_api_key.py index 0a83b3740..86d2dc953 100644 --- a/tiled/commandline/_api_key.py +++ b/tiled/commandline/_api_key.py @@ -1,99 +1,99 @@ -from typing import List, Optional +from typing import Annotated, Optional -import typer +from pydantic import AfterValidator, BaseModel, Field +from pydantic_settings import CliApp, CliSubCommand -from ._utils import get_context +from tiled.commandline._utils import ContextCommand -api_key_app = typer.Typer() +def to_seconds(expires_in: Optional[str]) -> Optional[str]: + if expires_in is None: + return expires_in + return expires_in + "s" if expires_in.isdigit() else expires_in -@api_key_app.command("create") -def create_api_key( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), - expires_in: Optional[str] = typer.Option( - None, - help=( - "Number of seconds until API key expires, given as integer seconds " - "or a string like: '3y' (years), '3d' (days), '5m' (minutes), '1h' " - "(hours), '30s' (seconds). If None, it will never expire or it will " - "have the maximum lifetime allowed by the server. " - ), - ), - scopes: Optional[List[str]] = typer.Option( - None, - help=( - "Restrict the access available to this API key by listing scopes. " - "By default, it will inherit the scopes of its owner." - ), - ), - note: Optional[str] = typer.Option(None, help="Add a note to label this API key."), - no_verify: bool = typer.Option(False, "--no-verify", help="Skip SSL verification."), -): - context = get_context(profile) - if not scopes: - # This is how typer interprets unspecified scopes. - # Replace with None to get default scopes. - scopes = None - if expires_in.isdigit(): - expires_in = int(expires_in) - info = context.create_api_key(scopes=scopes, expires_in=expires_in, note=note) - # TODO Print other info to the stderr? - typer.echo(info["secret"]) +class Create(ContextCommand): + expires_in: Annotated[ + Optional[str], + "Number of seconds until API key expires, given as integer seconds " + "or a string like: '3y' (years), '3d' (days), '5m' (minutes), '1h' " + "(hours), '30s' (seconds). If None, it will never expire or it will " + "have the maximum lifetime allowed by the server. ", + AfterValidator(to_seconds), + ] = None + scopes: Annotated[ + Optional[set[str]], + "Restrict the access available to this API key by listing scopes. " + "By default, it will inherit the scopes of its owner.", + ] = None + note: Annotated[Optional[str], "Add a note to label this API key."] = None + no_verify: Annotated[bool, "Skip SSL verification."] = False -@api_key_app.command("list") -def list_api_keys( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), -): - context = get_context(profile) - info = context.whoami() - if not info["api_keys"]: - typer.echo("No API keys found", err=True) - return - max_note_len = max(len(api_key["note"] or "") for api_key in info["api_keys"]) - COLUMNS = f"First 8 Expires at (UTC) Latest activity Note{' ' * (max_note_len - 4)} Scopes" - typer.echo(COLUMNS) - for api_key in info["api_keys"]: - note_padding = 2 + max_note_len - len(api_key["note"] or "") - if api_key["expiration_time"] is None: - expiration_time = "-" - else: - expiration_time = ( - api_key["expiration_time"] - .replace(microsecond=0, tzinfo=None) - .isoformat() - ) - if api_key["latest_activity"] is None: - latest_activity = "-" - else: - latest_activity = ( - api_key["latest_activity"] - .replace(microsecond=0, tzinfo=None) - .isoformat() - ) - typer.echo( - ( - f"{api_key['first_eight']:10}" - f"{expiration_time:21}" - f"{latest_activity:21}" - f"{(api_key['note'] or '')}{' ' * note_padding}" - f"{' '.join(api_key['scopes']) or '-'}" - ) + def cli_cmd(self) -> None: + context = self.context() + info = context.create_api_key( + scopes=self.scopes, expires_in=self.expires_in, note=self.note ) + # TODO Print other info to the stderr? + print(info["secret"]) + + +class Revoke(ContextCommand): + first_eight: Annotated[ + str, + "First eight characters of API key (or the whole key)", + AfterValidator(lambda string: string[:8]), + ] + + async def cli_cmd(self) -> None: + context = self.context() + context.revoke_api_key(self.first_eight) + + +class ListKeys(ContextCommand): + def cli_cmd(self) -> None: + context = self.context() + info = context.whoami() + if not info["api_keys"]: + print("No API keys found") + return + max_note_len = max(len(api_key.get("note", "") for api_key in info["api_keys"])) + print( + f"First 8 Expires at (UTC) Latest activity Note{' ' * (max_note_len - 4)} Scopes" + ) + for api_key in info["api_keys"]: + note_padding = 2 + max_note_len - len(api_key["note"] or "") + if api_key["expiration_time"] is None: + expiration_time = "-" + else: + expiration_time = ( + api_key["expiration_time"] + .replace(microsecond=0, tzinfo=None) + .isoformat() + ) + if api_key["latest_activity"] is None: + latest_activity = "-" + else: + latest_activity = ( + api_key["latest_activity"] + .replace(microsecond=0, tzinfo=None) + .isoformat() + ) + print( + ( + f"{api_key['first_eight']:10}" + f"{expiration_time:21}" + f"{latest_activity:21}" + f"{(api_key['note'] or '')}{' ' * note_padding}" + f"{' '.join(api_key['scopes']) or '-'}" + ) + ) + +class APIKeys(BaseModel): + list_keys: CliSubCommand[ListKeys] = Field(alias="list") + create: CliSubCommand[Create] + revoke: CliSubCommand[Revoke] -@api_key_app.command("revoke") -def revoke_api_key( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), - first_eight: str = typer.Argument( - ..., help="First eight characters of API key (or the whole key)" - ), -): - context = get_context(profile) - context.revoke_api_key(first_eight[:8]) + def cli_cmd(self) -> None: + CliApp.run_subcommand(self) diff --git a/tiled/commandline/_catalog.py b/tiled/commandline/_catalog.py deleted file mode 100644 index 48bb1eb7e..000000000 --- a/tiled/commandline/_catalog.py +++ /dev/null @@ -1,148 +0,0 @@ -import asyncio -from pathlib import Path -from typing import Optional - -import typer - -from ._serve import serve_catalog - -catalog_app = typer.Typer() -# Support both `tiled serve catalog` and `tiled catalog serve` as synonyms -# because I cannot decide which is right. -catalog_app.command("serve")(serve_catalog) -DEFAULT_SQLITE_CATALOG_FILENAME = "catalog.db" - - -@catalog_app.command("init") -def init( - database: str = typer.Argument( - Path.cwd() / DEFAULT_SQLITE_CATALOG_FILENAME, help="A filepath or database URI" - ), - if_not_exists: bool = typer.Option( - False, - help=( - "By default, it is an error if a database is already initialized." - "Set this flag to be permissive and return without an error." - ), - ), -): - """ - Initialize a database as a Tiled Catalog. - - Examples: - - # Using a simple local file as an embedded "database" (SQLite) - tiled init catalog.db - tiled init path/to/catalog.db - tiled init sqlite:////path/to/catalog.db - - # Using a client/serve database engine (PostgreSQL) - tiled init postgresql://uesrname:password@localhost/database_name:5432 - """ - from sqlalchemy.ext.asyncio import create_async_engine - - from ..alembic_utils import UninitializedDatabase, check_database, stamp_head - from ..catalog.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH - from ..catalog.core import ALL_REVISIONS, REQUIRED_REVISION, initialize_database - from ..utils import ensure_specified_sql_driver - - database = ensure_specified_sql_driver(database) - - async def do_setup(): - engine = create_async_engine(database) - redacted_url = engine.url._replace(password="[redacted]") - try: - await check_database(engine, REQUIRED_REVISION, ALL_REVISIONS) - except UninitializedDatabase: - # Create tables and stamp (alembic) revision. - typer.echo( - f"Database {redacted_url} is new. Creating tables.", - err=True, - ) - await initialize_database(engine) - typer.echo("Database initialized.", err=True) - else: - if not if_not_exists: - typer.echo( - f"Database at {redacted_url} is already initialized.", err=True - ) - raise typer.Abort() - finally: - await engine.dispose() - - asyncio.run(do_setup()) - stamp_head(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database) - - -@catalog_app.command("upgrade-database") -def upgrade_database( - database_uri: str, - revision: Optional[str] = typer.Argument( - None, - help="The ID of a revision to upgrade to. By default, upgrade to the latest one.", - ), -): - """ - Upgrade the catalog database schema to the latest version. - """ - import asyncio - - from sqlalchemy.ext.asyncio import create_async_engine - - from ..alembic_utils import get_current_revision, upgrade - from ..catalog.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH - from ..catalog.core import ALL_REVISIONS - from ..utils import ensure_specified_sql_driver - - database_uri = ensure_specified_sql_driver(database_uri) - - async def do_setup(): - engine = create_async_engine(database_uri) - redacted_url = engine.url._replace(password="[redacted]") - current_revision = await get_current_revision(engine, ALL_REVISIONS) - await engine.dispose() - if current_revision is None: - # Create tables and stamp (alembic) revision. - typer.echo( - f"Database {redacted_url} has not been initialized. Use `tiled catalog init`.", - err=True, - ) - raise typer.Abort() - - asyncio.run(do_setup()) - upgrade(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri, revision or "head") - - -@catalog_app.command("downgrade-database") -def downgrade_database( - database_uri: str, - revision: str = typer.Argument(..., help="The ID of a revision to downgrade to."), -): - """ - Upgrade the catalog database schema to the latest version. - """ - import asyncio - - from sqlalchemy.ext.asyncio import create_async_engine - - from ..alembic_utils import downgrade, get_current_revision - from ..catalog.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH - from ..catalog.core import ALL_REVISIONS - from ..utils import ensure_specified_sql_driver - - database_uri = ensure_specified_sql_driver(database_uri) - - async def do_setup(): - engine = create_async_engine(database_uri) - redacted_url = engine.url._replace(password="[redacted]") - current_revision = await get_current_revision(engine, ALL_REVISIONS) - if current_revision is None: - # Create tables and stamp (alembic) revision. - typer.echo( - f"Database {redacted_url} has not been initialized. Use `tiled catalog init`.", - err=True, - ) - raise typer.Abort() - - asyncio.run(do_setup()) - downgrade(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri, revision) diff --git a/tiled/commandline/_database.py b/tiled/commandline/_database.py new file mode 100644 index 000000000..0627ab463 --- /dev/null +++ b/tiled/commandline/_database.py @@ -0,0 +1,113 @@ +import asyncio +from abc import ABC +from typing import Annotated + +from pydantic import BaseModel, BeforeValidator, Field +from pydantic_settings import CliApp, CliSubCommand +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from ..alembic_utils import ( + UninitializedDatabase, + check_database, + downgrade, + get_current_revision, + stamp_head, + upgrade, +) +from ..authn_database.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH +from ..authn_database.core import ALL_REVISIONS, REQUIRED_REVISION, initialize_database +from ..utils import ensure_specified_sql_driver + + +class DatabaseCommand(ABC, BaseModel): + database_uri: Annotated[str, BeforeValidator(ensure_specified_sql_driver)] + + def get_engine(self) -> AsyncEngine: + return create_async_engine(self.database_uri) + + def redacted_url(self, engine: AsyncEngine) -> str: + return engine.url._replace(password="[redacted]") + + +class Initialize(DatabaseCommand): + """ + Initialize a SQL database for use by Tiled. + """ + + def cli_cmd(self) -> None: + async def inner(): + engine = self.get_engine() + redacted_url = self.redacted_url(engine) + try: + await check_database(engine, REQUIRED_REVISION, ALL_REVISIONS) + except UninitializedDatabase: + # Create tables and stamp (alembic) revision. + print( + f"Database {redacted_url} is new. Creating tables and marking revision {REQUIRED_REVISION}.", + ) + await initialize_database(engine) + print("Database initialized.") + else: + print(f"Database at {redacted_url} is already initialized.") + raise ValueError + await engine.dispose() + + asyncio.run(inner()) + stamp_head(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, self.database_uri) + + +class Upgrade(DatabaseCommand): + revision: Annotated[ + str, + "The ID of a revision to upgrade to. By default, upgrade to the latest one.", + ] = "head" + """ + Upgrade the database schema to the latest or a specified version. + """ + + async def cli_cmd(self) -> None: + async def inner(): + engine = self.get_engine() + redacted_url = self.redacted_url(engine) + current_revision = await get_current_revision(engine, ALL_REVISIONS) + await engine.dispose() + if current_revision is None: + raise UninitializedDatabase( + f"Database {redacted_url} has not been initialized. Use `tiled admin database init`." + ) + + asyncio.run(inner()) + upgrade( + ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, self.database_uri, self.revision + ) + + +class Downgrade(DatabaseCommand): + revision: Annotated[str, "The ID of a revision to downgrade to."] + """ + Downgrade the database schema to a specified version. + """ + + def cli_cmd(self) -> None: + async def inner(): + engine = create_async_engine(self.database_uri) + redacted_url = engine.url._replace(password="[redacted]") + current_revision = await get_current_revision(engine, ALL_REVISIONS) + if current_revision is None: + raise UninitializedDatabase( + f"Database {redacted_url} has not been initialized. Use `tiled admin database init`." + ) + + asyncio.run(inner()) + downgrade( + ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, self.database_uri, self.revision + ) + + +class Database(BaseModel): + initialize: CliSubCommand[Initialize] = Field(alias="init") + upgrade: CliSubCommand[Upgrade] + downgrade: CliSubCommand[Downgrade] + + def cli_cmd(self) -> None: + CliApp.run_subcommand(self) diff --git a/tiled/commandline/_principal.py b/tiled/commandline/_principal.py new file mode 100644 index 000000000..908077cf4 --- /dev/null +++ b/tiled/commandline/_principal.py @@ -0,0 +1,42 @@ +import json +from typing import Annotated + +from pydantic import BaseModel, Field +from pydantic_settings import CliApp, CliSubCommand + +from tiled.commandline._utils import ContextCommand + + +class ListPrincipals(ContextCommand): + page_offset: int = 0 + page_limit: Annotated[int, "Max items to show"] = 100 + """ + List information about all Principals (users or services) that have ever logged in. + """ + + def cli_cmd(self) -> None: + context = self.context() + result = context.admin.list_principals( + offset=self.page_offset, limit=self.page_limit + ) + print(json.dumps(result, indent=2)) + + +class Show(ContextCommand): + uuid: Annotated[str, "UUID identifying Principal of interest"] + """ + Show information about one Principal (user or service). + """ + + def cli_cmd(self) -> None: + context = self.context() + result = context.admin.show_principal(self.uuid) + print(json.dumps(result, indent=2)) + + +class Principals(BaseModel): + list_principals: CliSubCommand[ListPrincipals] = Field(alias="list") + show: CliSubCommand[Show] + + def cli_cmd(self) -> None: + CliApp.run_subcommand(self) diff --git a/tiled/commandline/_profile.py b/tiled/commandline/_profile.py index 6190eb2f3..2ae8505e8 100644 --- a/tiled/commandline/_profile.py +++ b/tiled/commandline/_profile.py @@ -1,174 +1,160 @@ -import typer - -profile_app = typer.Typer() - - -@profile_app.command("paths") -def profile_paths(): - "List the locations that the client will search for profiles (client-side configuration)." - from ..profiles import paths +import os +import sys +from asyncio import subprocess +from typing import Annotated, Optional + +import yaml +from pydantic import BaseModel, Field +from pydantic_settings import CliApp, CliSubCommand + +from tiled.commandline._utils import ContextCommand +from tiled.profiles import ( + ProfileExists, + create_profile, + delete_profile, + get_default_profile_name, + load_profiles, + paths, + set_default_profile_name, +) + + +class Paths(BaseModel): + """ + List the locations that the client will search for profiles (client-side configuration). + """ - print("\n".join(str(p) for p in paths)) + def cli_cmd(self) -> None: + print("\n".join(str(p) for p in paths)) -@profile_app.command("list") -def profile_list(): - "List the profiles (client-side configuration) found and the files they were read from." - from ..profiles import load_profiles +class ListProfiles(BaseModel): + """List the profiles (client-side configuration) found and the files they were read from.""" - profiles = load_profiles() - if not profiles: - typer.echo("No profiles found.") - return - max_len = max(len(name) for name in profiles) - PADDING = 4 + def cli_cmd(self) -> None: + profiles = load_profiles() + if not profiles: + print("No profiles found.") + return + max_len = max(len(name) for name in profiles) + PADDING = 4 - print( - "\n".join( - f"{name:<{max_len + PADDING}}{filepath}" - for name, (filepath, _) in profiles.items() + print( + "\n".join( + f"{name:<{max_len + PADDING}}{filepath}" + for name, (filepath, _) in profiles.items() + ) ) - ) -@profile_app.command("show") -def profile_show(profile_name: str): +class Show(ContextCommand): "Show the content of a profile." - import sys - import yaml + def cli_cmd(self) -> None: + filepath, content = self.profile_contents() + print(f"Source: {filepath}", file=sys.stderr) + print("--", file=sys.stderr) + print(yaml.dump(content), file=sys.stdout) - from ..profiles import load_profiles - profiles = load_profiles() - try: - filepath, content = profiles[profile_name] - except KeyError: - typer.echo( - f"The profile {profile_name!r} could not be found. " - "Use tiled profile list to see profile names.", - err=True, - ) - raise typer.Abort() - print(f"Source: {filepath}", file=sys.stderr) - print("--", file=sys.stderr) - print(yaml.dump(content), file=sys.stdout) +class Edit(ContextCommand): + def cli_cmd(self) -> None: + filepath, _ = self.profile_contents() + print(f"Opening {filepath} in default text editor...", file=sys.stderr) + if sys.platform.system() == "Darwin": + subprocess.call(("open", filepath)) + elif sys.platform.system() == "Windows": + os.startfile(filepath) + else: + subprocess.call(("xdg-open", filepath)) -@profile_app.command("edit") -def profile_edit(profile_name: str): - "Show the content of a profile." - import sys - - from ..profiles import load_profiles - - profiles = load_profiles() - try: - filepath, content = profiles[profile_name] - except KeyError: - typer.echo( - f"The profile {profile_name!r} could not be found. " - "Use tiled profile list to see profile names.", - err=True, - ) - raise typer.Abort() - print(f"Opening {filepath} in default text editor...", file=sys.stderr) - - import os - import platform - import subprocess - - if platform.system() == "Darwin": - subprocess.call(("open", filepath)) - elif platform.system() == "Windows": - os.startfile(filepath) - else: - subprocess.call(("xdg-open", filepath)) - - -@profile_app.command("create") -def create( - uri: str = typer.Argument(..., help="URI 'http[s]://...'"), - name: str = typer.Option("auto", help="Profile name, a short convenient alias"), - set_default: bool = typer.Option( - True, help="Set new profile as the default profile." - ), - overwrite: bool = typer.Option( - False, "--overwrite", help="Overwrite an existing profile of this name." - ), - no_verify: bool = typer.Option(False, "--no-verify", help="Skip SSL verification."), -): + +class Create(ContextCommand): + uri: Annotated[str, "URI 'http[s]://...'"] + set_default: Annotated[bool, "Set new profile as the default profile."] = True + overwrite: Annotated[bool, "Overwrite an existing profile of this name."] = False + verify: Annotated[bool, "Perform SSL verification."] = True """ Create a 'profile' that can be used to connect to a Tiled server. """ - from ..profiles import ProfileExists, create_profile, set_default_profile_name - try: - create_profile(name=name, uri=uri, verify=not no_verify, overwrite=overwrite) - except ProfileExists: - typer.echo( - f"A profile named {name!r} already exists. Use --overwrite to overwrite it." - ) - raise typer.Abort() - if set_default: - set_default_profile_name(name) - typer.echo(f"Tiled profile {name!r} created and set as the default.", err=True) - else: - typer.echo(f"Tiled profile {name!r} created.", err=True) - - -@profile_app.command("delete") -def delete( - name: str = typer.Argument(..., help="Profile name"), -): - from ..profiles import ( - delete_profile, - get_default_profile_name, - set_default_profile_name, - ) - - # Unset the default if this profile is currently the default. - default = get_default_profile_name() - if default == name: - set_default_profile_name(None) - delete_profile(name) - typer.echo(f"Tiled profile {name!r} deleted.", err=True) - - -@profile_app.command("get-default") -def get_default(): + def cli_cmd(self): + try: + create_profile( + name=self.profile_name, + uri=self.uri, + verify=self.verify, + overwrite=self.overwrite, + ) + except ProfileExists as e: + print( + f"A profile named {self.profile_name!r} already exists. Use --overwrite to overwrite it." + ) + raise e + if self.set_default: + set_default_profile_name(self.profile_name) + print( + f"Tiled profile {self.profile_name!r} created and set as the default." + ) + else: + print(f"Tiled profile {self.profile_name!r} created.") + + +class Delete(ContextCommand): + def cli_cmd(self) -> None: + # Unset the default if this profile is currently the default. + default = get_default_profile_name() + if default == self.profile_name: + set_default_profile_name(None) + delete_profile(self.profile_name) + print(f"Tiled profile {self.profile_name!r} deleted.") + + +class GetDefault(BaseModel): """ Show the current default Tiled profile. """ - from ..profiles import get_default_profile_name, load_profiles - - name = get_default_profile_name() - if name is None: - typer.echo("No default.", err=True) - else: - import yaml - source_filepath, profile_content = load_profiles()[name] - typer.echo(f"# Profile name: {name!r}") - typer.echo(f"# {source_filepath} \n") - typer.echo(yaml.dump(profile_content)) + def cli_cmd(self) -> None: + name = get_default_profile_name() + if name is None: + print("No default.") + else: + source_filepath, profile_content = load_profiles()[name] + print(f"# Profile name: {name!r} # {source_filepath} \n") + print(profile_content) -@profile_app.command("set-default") -def set_default(profile_name: str): +class SetDefault(BaseModel): """ Set the default Tiled profile. """ - from ..profiles import set_default_profile_name - set_default_profile_name(profile_name) + profile_name: Annotated[ + Optional[str], "Profile name to set as default, or None to clear" + ] = None + def cli_cmd(self) -> None: + set_default_profile_name(self.profile_name) -@profile_app.command("clear-default") -def clear_default(): - """ - Clear the default Tiled profile. - """ - from ..profiles import set_default_profile_name - set_default_profile_name(None) +class DefaultProfile(BaseModel): + set_default: CliSubCommand[SetDefault] = Field(alias="set") + get: CliSubCommand[GetDefault] + + def cli_cmd(self) -> None: + CliApp.run_subcommand(self) + + +class Profiles(BaseModel): + default_profile: CliSubCommand[DefaultProfile] = Field(alias="default") + paths: CliSubCommand[Paths] + list_profiles: CliSubCommand[ListProfiles] = Field(alias="list") + show: CliSubCommand[Show] + edit: CliSubCommand[Edit] + create: CliSubCommand[Create] + delete: CliSubCommand[Delete] + + def cli_cmd(self) -> None: + CliApp.run_subcommand(self) diff --git a/tiled/commandline/_register.py b/tiled/commandline/_register.py index 98d14b6f4..b0d70cb1e 100644 --- a/tiled/commandline/_register.py +++ b/tiled/commandline/_register.py @@ -1,143 +1,115 @@ import asyncio import re -from typing import List +from logging import StreamHandler +from typing import Annotated, List, Optional -import typer +from pydantic import BaseModel +from tiled.client.constructors import from_uri +from tiled.client.register import identity, logger, register, watch -def register( - uri: str = typer.Argument(..., help="URL to Tiled node to register on"), - filepath: str = typer.Argument(..., help="A file or directory to register"), - verbose: bool = typer.Option( - False, - "--verbose", - "-v", - help=("Log details of directory traversal and file registration."), - ), - watch: bool = typer.Option( - False, - "--watch", - "-w", - help="Update catalog when files are added, removed, or changed.", - ), - prefix: str = typer.Option( - "/", help="Location within the catalog's namespace to register these files" - ), - keep_ext: bool = typer.Option( - False, - "--keep-ext", - help=( - "Serve a file like 'measurements.csv' as its full filepath with extension, " - "instead of the default which would serve it as 'measurements'. " - "This is discouraged because it leaks details about the storage " - "format to the client, such that changing the storage in the future " - "may break user (client-side) code." - ), - ), - ext: List[str] = typer.Option( - None, - "--ext", - help=( - "Support custom file extension, mapping it to a known mimetype. " - "Spell like '.tif=image/tiff'. Include the leading '.' in the file " - "extension." - ), - ), - mimetype_detection_hook: str = typer.Option( - None, - "--mimetype-hook", - help=( - "ADVANCED: Custom mimetype detection Python function. " - "Expected interface: detect_mimetype(filepath, mimetype) -> mimetype " - "Specify here as 'package.module:function'" - ), - ), - adapters: List[str] = typer.Option( - None, - "--adapter", - help=( - "ADVANCED: Custom Tiled Adapter for reading a given format" - "Specify here as 'mimetype=package.module:function'" - ), - ), - walkers: List[str] = typer.Option( - None, - "--walker", - help=( - "ADVANCED: Custom Tiled Walker for traversing directories and " - "grouping files. This is used in conjunction with Adapters that operate " - "on groups of files. " - "Specify here as 'package.module:function'" - ), - ), - api_key: str = typer.Option( - None, - "--api-key", - ), -): - if keep_ext: - from ..client.register import identity - key_from_filename = identity - else: - key_from_filename = None - mimetypes_by_file_ext = {} - EXT_PATTERN = re.compile(r"(.*) *= *(.*)") - for item in ext or []: - match = EXT_PATTERN.match(item) - if match is None: - raise ValueError( - f"Failed parsing --ext option {item}, expected format '.ext=mimetype'" - ) - ext, mimetype = match.groups() - mimetypes_by_file_ext[ext] = mimetype - adapters_by_mimetype = {} - ADAPTER_PATTERN = re.compile(r"(.*) *= *(.*)") - for item in adapters or []: - match = ADAPTER_PATTERN.match(item) - if match is None: - raise ValueError( - f"Failed parsing --adapter option {item}, expected format 'mimetype=package.module:obj'" - ) - mimetype, obj_ref = match.groups() - adapters_by_mimetype[mimetype] = obj_ref - - from ..client import from_uri +class Register(BaseModel): + uri: Annotated[str, "URL to Tiled node to register on"] + filepath: Annotated[str, "A file or directory to register"] + verbose: Annotated[ + bool, "Log details of directory traversal and file registration." + ] = False + watch: Annotated[ + bool, "Update catalog when files are added, removed, or changed." + ] = False + prefix: Annotated[ + str, "Location within the catalog's namespace to register these files" + ] = "/" + keep_ext: Annotated[ + bool, + "Serve a file like 'measurements.csv' as its full filepath with extension, " + "instead of the default which would serve it as 'measurements'. " + "This is discouraged because it leaks details about the storage " + "format to the client, such that changing the storage in the future " + "may break user (client-side) code.", + ] = False + ext: Annotated[ + Optional[List[str]], + "Support custom file extension, mapping it to a known mimetype. " + "Spell like '.tif=image/tiff'. Include the leading '.' in the file " + "extension.", + ] = None + mimetype_detection_hook: Annotated[ + Optional[str], + "ADVANCED: Custom mimetype detection Python function. " + "Expected interface: detect_mimetype(filepath, mimetype) -> mimetype " + "Specify here as 'package.module:function'", + ] = None - client_node = from_uri(uri, api_key=api_key) + adapters: Annotated[ + Optional[List[str]], + "ADVANCED: Custom Tiled Adapter for reading a given format" + "Specify here as 'mimetype=package.module:function'", + ] = None + walkers: Annotated[ + Optional[List[str]], + "ADVANCED: Custom Tiled Walker for traversing directories and " + "grouping files. This is used in conjunction with Adapters that operate " + "on groups of files. " + "Specify here as 'package.module:function'", + ] = None + api_key: Optional[str] = None - from logging import StreamHandler + def cli_cmd(self) -> None: + if self.keep_ext: + key_from_filename = identity + else: + key_from_filename = None + mimetypes_by_file_ext = {} + EXT_PATTERN = re.compile(r"(.*) *= *(.*)") + for item in self.ext or []: + match = EXT_PATTERN.match(item) + if match is None: + raise ValueError( + f"Failed parsing --ext option {item}, expected format '.ext=mimetype'" + ) + ext, mimetype = match.groups() + mimetypes_by_file_ext[ext] = mimetype + adapters_by_mimetype = {} + ADAPTER_PATTERN = re.compile(r"(.*) *= *(.*)") + for item in self.adapters or []: + match = ADAPTER_PATTERN.match(item) + if match is None: + raise ValueError( + f"Failed parsing --adapter option {item}, expected format 'mimetype=package.module:obj'" + ) + mimetype, obj_ref = match.groups() + adapters_by_mimetype[mimetype] = obj_ref - from ..client.register import logger as register_logger - from ..client.register import register - from ..client.register import watch as watch_ + client_node = from_uri(self.uri, api_key=self.api_key) - if verbose: - register_logger.addHandler(StreamHandler()) - register_logger.setLevel("INFO") - if watch: - asyncio.run( - watch_( - client_node, - filepath, - prefix=prefix, - mimetype_detection_hook=mimetype_detection_hook, - mimetypes_by_file_ext=mimetypes_by_file_ext, - adapters_by_mimetype=adapters_by_mimetype, - walkers=walkers, - key_from_filename=key_from_filename, + if self.verbose: + logger.addHandler(StreamHandler()) + logger.setLevel("INFO") + if self.watch: + asyncio.run( + watch( + client_node, + self.filepath, + prefix=self.prefix, + mimetype_detection_hook=self.mimetype_detection_hook, + mimetypes_by_file_ext=mimetypes_by_file_ext, + adapters_by_mimetype=adapters_by_mimetype, + walkers=self.walkers, + key_from_filename=key_from_filename, + ) ) - ) - else: - asyncio.run( - register( - client_node, - filepath, - prefix=prefix, - mimetype_detection_hook=mimetype_detection_hook, - mimetypes_by_file_ext=mimetypes_by_file_ext, - adapters_by_mimetype=adapters_by_mimetype, - walkers=walkers, - key_from_filename=key_from_filename, + else: + asyncio.run( + register( + client_node, + self.filepath, + prefix=self.prefix, + mimetype_detection_hook=self.mimetype_detection_hook, + mimetypes_by_file_ext=mimetypes_by_file_ext, + adapters_by_mimetype=adapters_by_mimetype, + walkers=self.walkers, + key_from_filename=key_from_filename, + ) ) - ) diff --git a/tiled/commandline/_serve.py b/tiled/commandline/_serve.py index e2a2635eb..d082f304d 100644 --- a/tiled/commandline/_serve.py +++ b/tiled/commandline/_serve.py @@ -1,725 +1,439 @@ +import asyncio +import copy +import functools import os import re +import tempfile +from abc import ABC +from logging import StreamHandler from pathlib import Path -from typing import List, Optional - -import typer - -serve_app = typer.Typer() +from typing import Annotated, List, Optional, Self + +import anyio +import uvicorn +from pydantic import AfterValidator, BaseModel, model_validator +from pydantic_settings import CliApp, CliSubCommand, SettingsError +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine + +from tiled.adapters.mapping import MapAdapter +from tiled.alembic_utils import stamp_head +from tiled.authn_database.alembic_constants import ( + ALEMBIC_DIR, + ALEMBIC_INI_TEMPLATE_PATH, +) +from tiled.authn_database.core import initialize_database +from tiled.catalog.adapter import logger as catalog_logger +from tiled.catalog.utils import classify_writable_storage +from tiled.client.constructors import from_uri +from tiled.client.register import identity +from tiled.client.register import logger as register_logger +from tiled.client.register import register, watch +from tiled.config import parse_configs +from tiled.server.app import build_app, print_server_info +from tiled.server.logging_config import LOGGING_CONFIG +from tiled.server.settings import Settings +from tiled.utils import ensure_specified_sql_driver, import_object + +from ..catalog import from_uri as catalog_from_uri +from ..client import from_uri as client_from_uri SQLITE_CATALOG_FILENAME = "catalog.db" DUCKDB_TABULAR_DATA_FILENAME = "data.duckdb" DATA_SUBDIRECTORY = "data" -@serve_app.command("directory") -def serve_directory( - directory: str = typer.Argument(..., help="A directory to serve"), - verbose: bool = typer.Option( - False, - "--verbose", - "-v", - help=("Log details of directory traversal and file registration."), - ), - watch: bool = typer.Option( - False, - "--watch", - "-w", - help="Update catalog when files are added, removed, or changed.", - ), - public: bool = typer.Option( - False, - "--public", - help=( - "Turns off requirement for API key authentication for reading. " - "However, the API key is still required for writing, so data cannot be modified even with " - "this option selected." - ), - ), - api_key: Optional[str] = typer.Option( - None, - "--api-key", - help=( - "Set the single-user API key. " - "By default, a random key is generated at startup and printed." - ), - ), - keep_ext: bool = typer.Option( - False, - "--keep-ext", - help=( - "Serve a file like 'measurements.csv' as its full filepath with extension, " - "instead of the default which would serve it as 'measurements'. " - "This is discouraged because it leaks details about the storage " - "format to the client, such that changing the storage in the future " - "may break user (client-side) code." - ), - ), - ext: Optional[List[str]] = typer.Option( - None, - "--ext", - help=( - "Support custom file extension, mapping it to a known mimetype. " - "Spell like '.tif=image/tiff'. Include the leading '.' in the file " - "extension." - ), - ), - mimetype_detection_hook: Optional[str] = typer.Option( - None, - "--mimetype-hook", - help=( - "ADVANCED: Custom mimetype detection Python function. " - "Expected interface: detect_mimetype(filepath, mimetype) -> mimetype " - "Specify here as 'package.module:function'" - ), - ), - adapters: Optional[List[str]] = typer.Option( - None, - "--adapter", - help=( - "ADVANCED: Custom Tiled Adapter for reading a given format" - "Specify here as 'mimetype=package.module:function'" - ), - ), - walkers: Optional[List[str]] = typer.Option( - None, - "--walker", - help=( - "ADVANCED: Custom Tiled Walker for traversing directories and " - "grouping files. This is used in conjunction with Adapters that operate " - "on groups of files. " - "Specify here as 'package.module:function'" - ), - ), - host: str = typer.Option( - "127.0.0.1", - help=( - "Bind socket to this host. Use `--host 0.0.0.0` to make the application " - "available on your local network. IPv6 addresses are supported, for " - "example: --host `'::'`." - ), - ), - port: int = typer.Option(8000, help="Bind to a socket with this port."), - log_config: Optional[str] = typer.Option( - None, help="Custom uvicorn logging configuration file" - ), - log_timestamps: bool = typer.Option( - False, help="Include timestamps in log output." - ), -): - "Serve a Tree instance from a directory of files." - import tempfile - - temp_directory = Path(tempfile.TemporaryDirectory().name) - temp_directory.mkdir() - typer.echo( - f"Creating catalog database at {temp_directory / SQLITE_CATALOG_FILENAME}", - err=True, - ) - database = f"sqlite:///{Path(temp_directory, SQLITE_CATALOG_FILENAME)}" - - # Because this is a tempfile we know this is a fresh database and we do not - # need to check its current state. - # We _will_ go ahead and stamp it with a revision because it is possible the - # user will copy it into a permanent location. - - import asyncio - - from sqlalchemy.ext.asyncio import create_async_engine - - from ..alembic_utils import stamp_head - from ..catalog.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH - from ..catalog.core import initialize_database - from ..utils import ensure_specified_sql_driver +class ServerCommand(ABC, BaseModel): + host: Annotated[ + str, + "Bind socket to this host. Use `--host 0.0.0.0` to make the application " + "available on your local network. IPv6 addresses are supported, for " + "example: --host `'::'`.", + ] = "127.0.0.1" + port: Annotated[int, "Bind to a socket with this port."] = 8000 + + verbose: Annotated[ + bool, "Log details of directory traversal and file registration." + ] = False + public: Annotated[ + bool, + "Turns off requirement for API key authentication for reading. " + "However, the API key is still required for writing, so data cannot be modified even with " + "this option selected.", + ] = False + api_key: Annotated[ + Optional[str], + "Set the single-user API key. " + "By default, a random key is generated at startup and printed.", + ] = None + keep_ext: Annotated[ + bool, + "Serve a file like 'measurements.csv' as its full filepath with extension, " + "instead of the default which would serve it as 'measurements'. " + "This is discouraged because it leaks details about the storage " + "format to the client, such that changing the storage in the future " + "may break user (client-side) code.", + ] = False + ext: Annotated[ + Optional[List[str]], + "Support custom file extension, mapping it to a known mimetype. " + "Spell like '.tif=image/tiff'. Include the leading '.' in the file " + "extension.", + ] = None + mimetype_detection_hook: Annotated[ + Optional[str], + "ADVANCED: Custom mimetype detection Python function. " + "Expected interface: detect_mimetype(filepath, mimetype) -> mimetype " + "Specify here as 'package.module:function'", + ] = None + adapters: Annotated[ + Optional[List[str]], + "ADVANCED: Custom Tiled Adapter for reading a given format" + "Specify here as 'mimetype=package.module:function'", + ] = None + walkers: Annotated[ + Optional[List[str]], + "ADVANCED: Custom Tiled Walker for traversing directories and " + "grouping files. This is used in conjunction with Adapters that operate " + "on groups of files. " + "Specify here as 'package.module:function'", + ] = None + log_config: Annotated[ + Optional[str], "Custom uvicorn logging configuration file" + ] = None + log_timestamps: Annotated[bool, "Include timestamps in log output."] = False + scalable: Annotated[ + bool, + "This verifies that the configuration is compatible with scaled (multi-process) deployments.", + ] = False + + def get_temporary_catalog_directory() -> Path: + temp_directory = Path(tempfile.TemporaryDirectory().name) + temp_directory.mkdir() + return temp_directory + + def get_database(self, database_uri: str) -> AsyncEngine: + if database_uri is None: + database_uri = self.get_temporary_catalog_directory() + database_uri = ensure_specified_sql_driver(database_uri) + + engine = create_async_engine(database_uri) + asyncio.run(initialize_database(engine)) + stamp_head(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_uri) + return engine + + def setup_log_config(self): + if self.log_config is None: + log_config = LOGGING_CONFIG + + if self.log_timestamps: + log_config = copy.deepcopy(log_config) + try: + log_config["formatters"]["access"]["format"] = ( + "[%(asctime)s.%(msecs)03dZ] " + + log_config["formatters"]["access"]["format"] + ) + log_config["formatters"]["default"]["format"] = ( + "[%(asctime)s.%(msecs)03dZ] " + + log_config["formatters"]["default"]["format"] + ) + except KeyError: + print( + "The --log-timestamps option is only applicable with a logging " + "configuration that, like the default logging configuration, has " + "formatters 'access' and 'default'." + ) + raise SettingsError() + return log_config + + def build_server(self, tree: MapAdapter) -> uvicorn.Server: + log_config = self.setup_log_config() + + web_app = build_app( + tree, + Settings( + allow_anonymous_access=self.public, single_user_api_key=self.api_key + ), + scalable=self.scalable, + ) + print_server_info( + web_app, + host=self.host, + port=self.port, + include_api_key=self.api_key is None, + ) - database = ensure_specified_sql_driver(database) - engine = create_async_engine(database) - asyncio.run(initialize_database(engine)) - stamp_head(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database) + config = uvicorn.Config( + web_app, host=self.host, port=self.port, log_config=log_config + ) + return uvicorn.Server(config) - from ..catalog import from_uri as catalog_from_uri - from ..server.app import build_app, print_server_info - server_settings = {} - if keep_ext: - from ..adapters.files import identity +class Directory(ServerCommand): + directory: Annotated[str, "A directory to serve"] + watch: Annotated[ + bool, "Update catalog when files are added, removed, or changed." + ] = False + "Serve a Tree instance from a directory of files." - key_from_filename = identity - else: - key_from_filename = None + def cli_cmd(self) -> None: + database_dir = self.get_temporary_catalog_directory() + engine = self.get_database(database_dir) + asyncio.run(initialize_database(engine)) + stamp_head(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database_dir) - from logging import StreamHandler + if self.keep_ext: + key_from_filename = identity + else: + key_from_filename = None + + mimetypes_by_file_ext = {} + EXT_PATTERN = re.compile(r"(.*) *= *(.*)") + for item in self.ext or []: + match = EXT_PATTERN.match(item) + if match is None: + raise ValueError( + f"Failed parsing --ext option {item}, expected format '.ext=mimetype'" + ) + ext, mimetype = match.groups() + mimetypes_by_file_ext[ext] = mimetype + adapters_by_mimetype = {} + ADAPTER_PATTERN = re.compile(r"(.*) *= *(.*)") + for item in self.adapters or []: + match = ADAPTER_PATTERN.match(item) + if match is None: + raise ValueError( + f"Failed parsing --adapter option {item}, expected format 'mimetype=package.module:obj'" + ) + mimetype, obj_ref = match.groups() + adapters_by_mimetype[mimetype] = obj_ref + catalog_adapter = catalog_from_uri( + ensure_specified_sql_driver(database_dir), + readable_storage=[database_dir], + adapters_by_mimetype=adapters_by_mimetype, + ) + if self.verbose: + catalog_logger.addHandler(StreamHandler()) + catalog_logger.setLevel("INFO") + register_logger.addHandler(StreamHandler()) + register_logger.setLevel("INFO") + + server = self.build_server(catalog_adapter) + + async def run_server(): + await server.serve() + + async def wait_for_server(): + "Wait for server to start up, or raise TimeoutError." + for _ in range(100): + await asyncio.sleep(0.1) + if server.started: + break + else: + raise TimeoutError("Server did not start in 10 seconds.") + host, port = server.servers[0].sockets[0].getsockname() + api_url = f"http://{host}:{port}/api/v1/" + return api_url + + if self.watch: + + async def serve_and_walk(): + server_task = asyncio.create_task(run_server()) + api_url = await wait_for_server() + # When we add an AsyncClient for Tiled, use that here. + client = await anyio.to_thread.run_sync( + functools.partial(client_from_uri, api_url, api_key=self.api_key) + ) - from ..client.register import logger as register_logger - from ..client.register import register - from ..client.register import watch as watch_ + print(f"Server is up. Indexing files in {self.directory}...") + event = anyio.Event() + asyncio.create_task( + watch( + client, + self.directory, + initial_walk_complete_event=event, + mimetype_detection_hook=self.mimetype_detection_hook, + mimetypes_by_file_ext=mimetypes_by_file_ext, + adapters_by_mimetype=adapters_by_mimetype, + walkers=self.walkers, + key_from_filename=key_from_filename, + ) + ) + await event.wait() + print("Initial indexing complete. Watching for changes...") + await server_task - mimetypes_by_file_ext = {} - EXT_PATTERN = re.compile(r"(.*) *= *(.*)") - for item in ext or []: - match = EXT_PATTERN.match(item) - if match is None: - raise ValueError( - f"Failed parsing --ext option {item}, expected format '.ext=mimetype'" - ) - ext, mimetype = match.groups() - mimetypes_by_file_ext[ext] = mimetype - adapters_by_mimetype = {} - ADAPTER_PATTERN = re.compile(r"(.*) *= *(.*)") - for item in adapters or []: - match = ADAPTER_PATTERN.match(item) - if match is None: - raise ValueError( - f"Failed parsing --adapter option {item}, expected format 'mimetype=package.module:obj'" - ) - mimetype, obj_ref = match.groups() - adapters_by_mimetype[mimetype] = obj_ref - catalog_adapter = catalog_from_uri( - ensure_specified_sql_driver(database), - readable_storage=[directory], - adapters_by_mimetype=adapters_by_mimetype, - ) - if verbose: - from tiled.catalog.adapter import logger as catalog_logger - - catalog_logger.addHandler(StreamHandler()) - catalog_logger.setLevel("INFO") - register_logger.addHandler(StreamHandler()) - register_logger.setLevel("INFO") - # Set the API key manually here, rather than letting the server do it, - # so that we can pass it to the client. - generated = False - if api_key is None: - api_key = os.getenv("TILED_SINGLE_USER_API_KEY") - if api_key is None: - import secrets - - api_key = secrets.token_hex(32) - generated = True - - web_app = build_app( - catalog_adapter, - { - "allow_anonymous_access": public, - "single_user_api_key": api_key, - }, - server_settings, - ) - import functools - - import anyio - import uvicorn - - from ..client import from_uri as client_from_uri - - print_server_info(web_app, host=host, port=port, include_api_key=generated) - log_config = _setup_log_config(log_config, log_timestamps) - config = uvicorn.Config(web_app, host=host, port=port, log_config=log_config) - server = uvicorn.Server(config) - - async def run_server(): - await server.serve() - - async def wait_for_server(): - "Wait for server to start up, or raise TimeoutError." - for _ in range(100): - await asyncio.sleep(0.1) - if server.started: - break else: - raise TimeoutError("Server did not start in 10 seconds.") - host, port = server.servers[0].sockets[0].getsockname() - api_url = f"http://{host}:{port}/api/v1/" - return api_url - - if watch: - - async def serve_and_walk(): - server_task = asyncio.create_task(run_server()) - api_url = await wait_for_server() - # When we add an AsyncClient for Tiled, use that here. - client = await anyio.to_thread.run_sync( - functools.partial(client_from_uri, api_url, api_key=api_key) - ) - typer.echo(f"Server is up. Indexing files in {directory}...") - event = anyio.Event() - asyncio.create_task( - watch_( + async def serve_and_walk(): + server_task = asyncio.create_task(run_server()) + api_url = await wait_for_server() + # When we add an AsyncClient for Tiled, use that here. + client = await anyio.to_thread.run_sync( + functools.partial(client_from_uri, api_url, api_key=self.api_key) + ) + + print(f"Server is up. Indexing files in {self.directory}...") + await register( client, - directory, - initial_walk_complete_event=event, - mimetype_detection_hook=mimetype_detection_hook, + self.directory, + mimetype_detection_hook=self.mimetype_detection_hook, mimetypes_by_file_ext=mimetypes_by_file_ext, adapters_by_mimetype=adapters_by_mimetype, - walkers=walkers, + walkers=self.walkers, key_from_filename=key_from_filename, ) - ) - await event.wait() - typer.echo("Initial indexing complete. Watching for changes...") - await server_task - - else: - - async def serve_and_walk(): - server_task = asyncio.create_task(run_server()) - api_url = await wait_for_server() - # When we add an AsyncClient for Tiled, use that here. - client = await anyio.to_thread.run_sync( - functools.partial(client_from_uri, api_url, api_key=api_key) - ) - - typer.echo(f"Server is up. Indexing files in {directory}...") - await register( - client, - directory, - mimetype_detection_hook=mimetype_detection_hook, - mimetypes_by_file_ext=mimetypes_by_file_ext, - adapters_by_mimetype=adapters_by_mimetype, - walkers=walkers, - key_from_filename=key_from_filename, - ) - typer.echo("Indexing complete.") - await server_task - - asyncio.run(serve_and_walk()) - - -def serve_catalog( - database: Optional[str] = typer.Argument( - None, help="A filepath or database URI, e.g. 'catalog.db'" - ), - read: Optional[List[str]] = typer.Option( - None, - "--read", - "-r", - help="Locations that the server may read from", - ), - write: Optional[List[str]] = typer.Option( - None, - "--write", - "-w", - help="Locations that the server may write to", - ), - temp: bool = typer.Option( - False, - "--temp", - help="Make a new catalog in a temporary directory.", - ), - init: bool = typer.Option( - False, - "--init", - help="Initialize a new catalog database.", - ), - public: bool = typer.Option( - False, - "--public", - help=( - "Turns off requirement for API key authentication for reading. " - "However, the API key is still required for writing, so data cannot be modified even with " - "this option selected." - ), - ), - api_key: Optional[str] = typer.Option( - None, - "--api-key", - help=( - "Set the single-user API key. " - "By default, a random key is generated at startup and printed." - ), - ), - host: str = typer.Option( - "127.0.0.1", - help=( - "Bind socket to this host. Use `--host 0.0.0.0` to make the application " - "available on your local network. IPv6 addresses are supported, for " - "example: --host `'::'`." - ), - ), - port: int = typer.Option(8000, help="Bind to a socket with this port."), - scalable: bool = typer.Option( - False, - "--scalable", - help=( - "This verifies that the configuration is compatible with scaled (multi-process) deployments." - ), - ), - log_config: Optional[str] = typer.Option( - None, help="Custom uvicorn logging configuration file" - ), - log_timestamps: bool = typer.Option( - False, help="Include timestamps in log output." - ), - verbose: bool = typer.Option( - False, - "--verbose", - "-v", - help=("Log details of catalog creation."), - ), -): + print("Indexing complete.") + await server_task + + asyncio.run(serve_and_walk()) + + +class Catalog(ServerCommand): + database: Annotated[ + Optional[str], "A filepath or database URI, e.g. 'catalog.db'" + ] = None + read: Annotated[ + Optional[List[str]], "Locations that the server may read from" + ] = None + write: Annotated[ + Optional[List[str]], "Locations that the server may write to" + ] = None + temp: Annotated[bool, "Make a new catalog in a temporary directory."] = False + init: Annotated[bool, "Initialize a new catalog database."] = False + scalable: Annotated[ + bool, + "This verifies that the configuration is compatible with scaled (multi-process) deployments.", + ] = False "Serve a catalog." - import urllib.parse - - from ..catalog import from_uri - from ..catalog.utils import classify_writable_storage - from ..server.app import build_app, print_server_info - - parsed_database = urllib.parse.urlparse(database) - if parsed_database.scheme in ("", "file"): - database = f"sqlite:///{parsed_database.path}" - - write = write or [] - if temp: - if database is not None: - typer.echo( - "The option --temp was set but a database was also provided. " - "Do one or the other.", - err=True, - ) - raise typer.Abort() - import tempfile - - directory = Path(tempfile.TemporaryDirectory().name) - typer.echo( - f"Initializing temporary storage in {directory}", - err=True, - ) - directory.mkdir() - database = f"sqlite:///{Path(directory, SQLITE_CATALOG_FILENAME)}" - - # Because this is a tempfile we know this is a fresh database and we do not - # need to check its current state. - # We _will_ go ahead and stamp it with a revision because it is possible the - # user will copy it into a permanent location. - - import asyncio - - from sqlalchemy.ext.asyncio import create_async_engine - - from ..alembic_utils import stamp_head - from ..catalog.alembic_constants import ALEMBIC_DIR, ALEMBIC_INI_TEMPLATE_PATH - from ..catalog.core import initialize_database - from ..utils import ensure_specified_sql_driver - database = ensure_specified_sql_driver(database) - typer.echo( - f" catalog database: {directory / SQLITE_CATALOG_FILENAME}", - err=True, - ) - engine = create_async_engine(database) - asyncio.run(initialize_database(engine)) - stamp_head(ALEMBIC_INI_TEMPLATE_PATH, ALEMBIC_DIR, database) - - if not write: - typer.echo( - f" writable file storage: {directory / DATA_SUBDIRECTORY}", - err=True, + @model_validator(mode="after") + def temp_or_database(self) -> Self: + if self.database is not None and self.temp: + raise ValueError("Expected temp or a database uri but received both.") + if self.database is None and not self.temp: + raise ValueError( + "Database required if not temp- try `tiled admin database init`." + ) + return self + + def cli_cmd(self) -> None: + write = self.write or [] + if self.temp and not write: + temp_directory: Path = self.get_temporary_catalog_directory() + print( + f" writable file storage: {temp_directory / DATA_SUBDIRECTORY}", ) - writable_dir = directory / DATA_SUBDIRECTORY + writable_dir = temp_directory / DATA_SUBDIRECTORY writable_dir.mkdir() write.append(writable_dir) - typer.echo( - f" writable tabular storage: {directory / DUCKDB_TABULAR_DATA_FILENAME}", - err=True, + print( + f" writable tabular storage: {temp_directory / DUCKDB_TABULAR_DATA_FILENAME}", ) tabular_data_database = ( - f"duckdb:///{Path(directory, DUCKDB_TABULAR_DATA_FILENAME)}" + f"duckdb:///{temp_directory / DUCKDB_TABULAR_DATA_FILENAME}" ) write.append(tabular_data_database) # TODO Hook into server lifecycle hooks to delete this at shutdown. - elif database is None: - typer.echo( - """A catalog must be specified. Either use a temporary catalog: - - tiled serve catalog --temp - -or initialize a new catalog, e.g. - tiled catalog init catalog.db - tiled serve catalog catalog.db + if self.verbose: + catalog_logger.addHandler(StreamHandler()) + catalog_logger.setLevel("INFO") -or use an existing one: + if not write: + print( + "This catalog will be served as read-only. " + "To make it writable, specify a writable directory with --write.", + ) - tiled serve catalog catalog.db -""", - err=True, + tree = from_uri( + self.database, + writable_storage=classify_writable_storage(write), + readable_storage=self.read, + init_if_not_exists=self.init, ) - raise typer.Abort() - elif verbose: - from logging import StreamHandler + self.run_server(tree) - from tiled.catalog.adapter import logger as catalog_logger - catalog_logger.addHandler(StreamHandler()) - catalog_logger.setLevel("INFO") - - if not write: - typer.echo( - "This catalog will be served as read-only. " - "To make it writable, specify a writable directory with --write.", - err=True, - ) +class PyObject(ServerCommand): + object_path: Annotated[ + str, "Object path, as in 'package.subpackage.module:object_name'" + ] - server_settings = {} - tree = from_uri( - database, - writable_storage=classify_writable_storage(write), - readable_storage=read, - init_if_not_exists=init, - ) - web_app = build_app( - tree, - { - "allow_anonymous_access": public, - "single_user_api_key": api_key, - }, - server_settings, - scalable=scalable, - ) - print_server_info( - web_app, host=host, port=port, include_api_key=api_key is not None - ) - - import uvicorn - - log_config = _setup_log_config(log_config, log_timestamps) - uvicorn.run(web_app, host=host, port=port, log_config=log_config) - - -serve_app.command("catalog")(serve_catalog) - - -@serve_app.command("pyobject") -def serve_pyobject( - object_path: str = typer.Argument( - ..., help="Object path, as in 'package.subpackage.module:object_name'" - ), - public: bool = typer.Option( - False, - "--public", - help=( - "Turns off requirement for API key authentication for reading. " - "However, the API key is still required for writing, so data cannot be modified even with this " - "option selected." - ), - ), - api_key: Optional[str] = typer.Option( - None, - "--api-key", - help=( - "Set the single-user API key. " - "By default, a random key is generated at startup and printed." - ), - ), - host: str = typer.Option( - "127.0.0.1", - help=( - "Bind socket to this host. Use `--host 0.0.0.0` to make the application " - "available on your local network. IPv6 addresses are supported, for " - "example: --host `'::'`." - ), - ), - port: int = typer.Option(8000, help="Bind to a socket with this port."), - scalable: bool = typer.Option( - False, - "--scalable", - help=( - "This verifies that the configuration is compatible with scaled (multi-process) deployments." - ), - ), - log_config: Optional[str] = typer.Option( - None, help="Custom uvicorn logging configuration file" - ), - log_timestamps: bool = typer.Option( - False, help="Include timestamps in log output." - ), -): "Serve a Tree instance from a Python module." - from ..server.app import build_app, print_server_info - from ..utils import import_object - - tree = import_object(object_path) - server_settings = {} - web_app = build_app( - tree, - { - "allow_anonymous_access": public, - "single_user_api_key": api_key, - }, - server_settings, - scalable=scalable, - ) - print_server_info(web_app, host=host, port=port, include_api_key=api_key is None) - - import uvicorn - - log_config = _setup_log_config(log_config, log_timestamps) - uvicorn.run(web_app, host=host, port=port, log_config=log_config) - - -@serve_app.command("demo") -def serve_demo( - host: str = typer.Option( - "127.0.0.1", - help=( - "Bind socket to this host. Use `--host 0.0.0.0` to make the application " - "available on your local network. IPv6 addresses are supported, for " - "example: --host `'::'`." - ), - ), - port: int = typer.Option(8000, help="Bind to a socket with this port."), -): - "Start a public server with example data." - from ..server.app import build_app, print_server_info - from ..utils import import_object - - EXAMPLE = "tiled.examples.generated:tree" - tree = import_object(EXAMPLE) - web_app = build_app(tree, {"allow_anonymous_access": True}, {}) - print_server_info(web_app, host=host, port=port, include_api_key=True) - - import uvicorn - - uvicorn.run(web_app, host=host, port=port) - - -@serve_app.command("config") -def serve_config( - config_path: Optional[Path] = typer.Argument( - None, - help=( - "Path to a config file or directory of config files. " - "If None, check environment variable TILED_CONFIG. " - "If that is unset, try default location ./config.yml." - ), - ), - public: bool = typer.Option( - False, - "--public", - help=( - "Turns off requirement for API key authentication for reading. " - "However, the API key is still required for writing, so data cannot be modified even with this " - "option selected." - ), - ), - api_key: Optional[str] = typer.Option( - None, - "--api-key", - help=( - "Set the single-user API key. " - "By default, a random key is generated at startup and printed." - ), - ), - host: Optional[str] = typer.Option( - None, - help=( - "Bind socket to this host. Use `--host 0.0.0.0` to make the application " - "available on your local network. IPv6 addresses are supported, for " - "example: --host `'::'`. Uses value in config by default." - ), - ), - port: Optional[int] = typer.Option( - None, help="Bind to a socket with this port. Uses value in config by default." - ), - scalable: bool = typer.Option( - False, - "--scalable", - help=( - "This verifies that the configuration is compatible with scaled (multi-process) deployments." - ), - ), - log_config: Optional[str] = typer.Option( - None, help="Custom uvicorn logging configuration file" - ), - log_timestamps: bool = typer.Option( - False, help="Include timestamps in log output." - ), -): - "Serve a Tree as specified in configuration file(s)." - import os - - from ..config import parse_configs - - config_path = config_path or os.getenv("TILED_CONFIG", "config.yml") - try: - parsed_config = parse_configs(config_path) - except Exception as err: - typer.echo(str(err), err=True) - raise typer.Abort() - - # Let --public flag override config. - if public: - if "authentication" not in parsed_config: - parsed_config["authentication"] = {} - parsed_config["authentication"]["allow_anonymous_access"] = True - # Let --api-key flag override config. - if api_key: - if "authentication" not in parsed_config: - parsed_config["authentication"] = {} - parsed_config["authentication"]["single_user_api_key"] = api_key - # Delay this import so that we can fail faster if config-parsing fails above. + def cli_cmd(self) -> None: + tree = import_object(self.object_path) + self.run_server(tree) - from ..server.app import build_app_from_config, logger, print_server_info - # Extract config for uvicorn. - uvicorn_kwargs = parsed_config.pop("uvicorn", {}) - # If --host is given, it overrides host in config. Same for --port and --log-config. - uvicorn_kwargs["host"] = host or uvicorn_kwargs.get("host", "127.0.0.1") - uvicorn_kwargs["port"] = port or uvicorn_kwargs.get("port", 8000) - uvicorn_kwargs["log_config"] = _setup_log_config( - log_config or uvicorn_kwargs.get("log_config"), - log_timestamps, - ) +class Demo(BaseModel): + host: Annotated[ + str, + "Bind socket to this host. Use `--host 0.0.0.0` to make the application " + "available on your local network. IPv6 addresses are supported, for " + "example: --host `'::'`.", + ] = "127.0.0.1" + port: Annotated[int, "Bind to a socket with this port."] = 8000 - # This config was already validated when it was parsed. Do not re-validate. - logger.info(f"Using configuration from {Path(config_path).absolute()}") + """Start a public server with example data.""" - if root_path := uvicorn_kwargs.get("root_path", ""): - parsed_config["root_path"] = root_path + def cli_cmd(self) -> None: + tree = import_object(self.object_path) + web_app = build_app(tree, Settings(allow_anonymous_access=True)) + print_server_info(web_app, host=self.host, port=self.port, include_api_key=True) + uvicorn.run(web_app, host=self.host, port=self.port) - web_app = build_app_from_config( - parsed_config, source_filepath=config_path, scalable=scalable - ) - print_server_info( - web_app, - host=uvicorn_kwargs["host"], - port=uvicorn_kwargs["port"], - include_api_key=api_key is None, - ) - # Likewise, delay this import. +def get_config_path(config_path: Optional[Path]) -> Path: + if config_path is None: + return Path(os.getenv("TILED_CONFIG", "config.yml")) + return config_path - import uvicorn - uvicorn.run(web_app, **uvicorn_kwargs) +class CheckConfig(BaseModel): + config_path: Annotated[ + Optional[Path], + "Path to a config file or directory of config files. " + "If None, check environment variable TILED_CONFIG. " + "If that is unset, try default location ./config.yml.", + AfterValidator(get_config_path), + ] = None + "Check configuration file for syntax and validation errors." + def cli_cmd(self) -> None: + try: + parse_configs(self.config_path) + print("No errors found in configuration.") + except Exception as err: + print(str(err), err=True) + raise SettingsError() + + +class Config(ServerCommand): + config_path: Annotated[ + Optional[Path], + "Path to a config file or directory of config files. " + "If None, check environment variable TILED_CONFIG. " + "If that is unset, try default location ./config.yml.", + AfterValidator(get_config_path), + ] = None + "Serve a Tree as specified in configuration file(s)." -def _setup_log_config(log_config, log_timestamps): - if log_config is None: - from ..server.logging_config import LOGGING_CONFIG - - log_config = LOGGING_CONFIG - - if log_timestamps: - import copy - - log_config = copy.deepcopy(log_config) + def cli_cmd(self) -> None: try: - log_config["formatters"]["access"]["format"] = ( - "[%(asctime)s.%(msecs)03dZ] " - + log_config["formatters"]["access"]["format"] - ) - log_config["formatters"]["default"]["format"] = ( - "[%(asctime)s.%(msecs)03dZ] " - + log_config["formatters"]["default"]["format"] - ) - except KeyError: - typer.echo( - "The --log-timestamps option is only applicable with a logging " - "configuration that, like the default logging configuration, has " - "formatters 'access' and 'default'." - ) - raise typer.Abort() - return log_config + settings: Settings = parse_configs(self.config_path) + self.build_server(settings.tree) + except Exception as err: + print(str(err), err=True) + raise SettingsError() + + +class Serve(BaseModel): + directory: CliSubCommand[Directory] + catalog: CliSubCommand[Catalog] + demo: CliSubCommand[Demo] + pyobject: CliSubCommand[PyObject] + config: CliSubCommand[Config] + + def cli_cmd(self) -> None: + CliApp.run_subcommand(self) diff --git a/tiled/commandline/_user.py b/tiled/commandline/_user.py new file mode 100644 index 000000000..a9bde962a --- /dev/null +++ b/tiled/commandline/_user.py @@ -0,0 +1,58 @@ +import json +from typing import Annotated + +from pydantic import BaseModel +from pydantic_settings import CliApp, CliSubCommand + +from tiled.commandline._utils import ContextCommand + + +class Logout(ContextCommand): + """ + Log out of an authenticated Tiled server. + """ + + def cli_cmd(self) -> None: + context = self.get_profile_context() + if context.use_cached_tokens(): + context.logout() + + +class WhoAmi(ContextCommand): + """ + Show logged in identity. + """ + + def cli_cmd(self) -> None: + context = self.get_profile_context() + context.use_cached_tokens() + whoami = context.whoami() + if whoami is None: + print("Not authenticated.") + else: + print(",".join(identity["id"] for identity in whoami["identities"])) + + +class Login(ContextCommand): + """ + Log in to an authenticated Tiled server. + """ + + show_secret_tokens: Annotated[ + bool, "Show secret tokens after successful login." + ] = False + + def cli_cmd(self) -> None: + context = self.get_profile_context() + context.authenticate() + if self.show_secret_tokens: + print(json.dumps(dict(context.tokens), indent=4)) + + +class User(BaseModel): + login: CliSubCommand[Login] + logout: CliSubCommand[Logout] + whoami: CliSubCommand[WhoAmi] + + def cli_cmd(self) -> None: + CliApp.run_subcommand(self) diff --git a/tiled/commandline/_utils.py b/tiled/commandline/_utils.py index 732fd0814..599f646d8 100644 --- a/tiled/commandline/_utils.py +++ b/tiled/commandline/_utils.py @@ -1,4 +1,31 @@ +from abc import ABC +from pathlib import Path +from typing import Annotated, Any, Optional + import typer +from pydantic import BaseModel + +from tiled.client.context import Context +from tiled.profiles import load_profiles + + +class ContextCommand(ABC, BaseModel): + profile_name: Annotated[ + Optional[str], + "Specify a profile to use when talking to one of multiple Tiled instances.", + ] = None + + def profile_contents(self) -> tuple[Path, dict[str, Any]]: + profiles = load_profiles() + if self.profile_name not in profiles: + raise KeyError( + f"The profile {self.profile_name!r} could not be found. " + "Use tiled profile list to see profile names.", + ) + return profiles[self.profile_name] + + def context(self) -> Context: + return get_context(self.profile_name) def get_profile(name): diff --git a/tiled/commandline/main.py b/tiled/commandline/main.py index f7a229232..2ef3a2dd4 100644 --- a/tiled/commandline/main.py +++ b/tiled/commandline/main.py @@ -1,159 +1,48 @@ -from typing import Optional +from typing import Annotated -try: - import typer -except Exception as err: - raise Exception( - """ -from ._admin import admin_app -from ._api_key import api_key_app -from ._profile import profile_app -from ._serve import serve_app +from pydantic_settings import BaseSettings, CliApp, CliSubCommand, SettingsConfigDict -You are trying to the run the tiled commandline tool but you do not have the -necessary dependencies. It looks like tiled has been installed with -bare-minimum dependencies, possibly via +from tiled.client.constructors import from_context +from tiled.commandline._user import User +from tiled.commandline._utils import ContextCommand +from tiled.utils import gen_tree - pip install tiled +from ._admin import Admin +from ._profile import Profiles +from ._register import Register +from ._serve import Serve -Instead, try: - pip install tiled[all] # Note: on a Mac, you may need quotes like 'tiled[all]'. - -which installs *everything* you might want. For other options, see: - - https://blueskyproject.io/tiled/tutorials/installation.html -""" - ) from err - - -cli_app = typer.Typer() - -from ._admin import admin_app # noqa: E402 -from ._api_key import api_key_app # noqa: E402 -from ._catalog import catalog_app # noqa: E402 -from ._profile import profile_app # noqa: E402 -from ._register import register # noqa: E402 -from ._serve import serve_app # noqa: E402 -from ._utils import get_context, get_profile # noqa E402 - -cli_app.add_typer( - catalog_app, name="catalog", help="Manage a catalog of data to be served by Tiled." -) -cli_app.add_typer(serve_app, name="serve", help="Launch a Tiled server.") -cli_app.add_typer( - profile_app, name="profile", help="Examine Tiled 'profiles' (client-side config)." -) -cli_app.add_typer( - api_key_app, name="api_key", help="Create, list, and revoke API keys." -) -cli_app.add_typer( - admin_app, - name="admin", - help="Administrative utilities for managing large deployments.", -) - - -@cli_app.command("login") -def login( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), - show_secret_tokens: bool = typer.Option( - False, "--show-secret-tokens", help="Show secret tokens after successful login." - ), -): - """ - Log in to an authenticated Tiled server. - """ - from ..client.context import Context - - profile_name, profile_content = get_profile(profile) - options = {"verify": profile_content.get("verify", True)} - context, _ = Context.from_any_uri(profile_content["uri"], **options) - context.authenticate() - if show_secret_tokens: - import json - - typer.echo(json.dumps(dict(context.tokens), indent=4)) - - -@cli_app.command("whoami") -def whoami( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), -): - """ - Show logged in identity. - """ - from ..client.context import Context - - profile_name, profile_content = get_profile(profile) - options = {"verify": profile_content.get("verify", True)} - context, _ = Context.from_any_uri(profile_content["uri"], **options) - context.use_cached_tokens() - whoami = context.whoami() - if whoami is None: - typer.echo("Not authenticated.") - else: - typer.echo(",".join(identity["id"] for identity in whoami["identities"])) - - -@cli_app.command("logout") -def logout( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), -): - """ - Log out. - """ - from ..client.context import Context - - profile_name, profile_content = get_profile(profile) - context, _ = Context.from_any_uri( - profile_content["uri"], verify=profile_content.get("verify", True) - ) - if context.use_cached_tokens(): - context.logout() - - -@cli_app.command("tree") -def tree( - profile: Optional[str] = typer.Option( - None, help="If you use more than one Tiled server, use this to specify which." - ), - max_lines: int = typer.Argument(20, help="Max lines to show."), -): +class Tree(ContextCommand): + max_lines: Annotated[int, "Max lines to show."] = 20 """ Show the names of entries in a Tree. This is similar to the UNIX utility `tree` for listing nested directories. """ - from ..client.constructors import from_context - from ..utils import gen_tree - - context = get_context(profile) - client = from_context(context) - for counter, line in enumerate(gen_tree(client), start=1): - if (max_lines is not None) and (counter > max_lines): - print( - f"Output truncated at {max_lines} lines. " - "Use `tiled tree ` to see lines." - ) - break - print(line) - - -cli_app.command("register")(register) + def cli_cmd(self) -> None: + context = self.context() + client = from_context(context) + for counter, line in enumerate(gen_tree(client), start=1): + if counter > self.max_lines: + print( + f"Output truncated at {self.max_lines} lines. " + "Use `tiled tree ` to see lines." + ) + break + print(line) -main = cli_app +class Tiled(BaseSettings, cli_parse_args=True): + profiles: CliSubCommand[Profiles] + user: CliSubCommand[User] + admin: CliSubCommand[Admin] + serve: CliSubCommand[Serve] + register: CliSubCommand[Register] + tree: CliSubCommand[Tree] -if __name__ == "__main__": - main() + model_config = SettingsConfigDict(cli_parse_args=True) -# This object is used by the auto-generated documentation. -typer_click_object = typer.main.get_command(cli_app) + def cli_cmd(self) -> None: + CliApp.run_subcommand(self)