diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 60e4950..89323d0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,10 +6,10 @@ jobs: strategy: matrix: py_version: - - '3.8' - '3.9' - '3.10' - '3.11' + - '3.12' steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 diff --git a/_bootstrap/__init__.py b/_bootstrap/__init__.py new file mode 100644 index 0000000..acf9960 --- /dev/null +++ b/_bootstrap/__init__.py @@ -0,0 +1,4 @@ +from .context import ServiceContext as ServiceContext +from .core import Bootstrap as Bootstrap +from .core import UnhandledExit as UnhandledExit +from .service import Service as Service \ No newline at end of file diff --git a/_bootstrap/context.py b/_bootstrap/context.py new file mode 100644 index 0000000..6ebe8dc --- /dev/null +++ b/_bootstrap/context.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import asyncio +from contextlib import asynccontextmanager +from dataclasses import dataclass +from enum import Enum, auto +from typing import TYPE_CHECKING, ClassVar + +if TYPE_CHECKING: + from .core import Bootstrap + + +class _State(Enum): + PREPARE_PRE = auto() + PREPARE_POST = auto() + CLEANUP_PRE = auto() + CLEANUP_POST = auto() + READY = auto() + + +@dataclass +class ServiceContext: + State: ClassVar = _State + + bootstrap: Bootstrap + + def __post_init__(self): + self._state: _State | None = None + self._ready: bool | None = None + self._notify = asyncio.Event() + self._switch = asyncio.Event() + self._sigexit = asyncio.Event() + + def _update(self): + self._notify.set() + + def switch(self): + self._switch.set() + + def enter(self): + if self._state is not _State.PREPARE_POST: + return + + self._ready = True + self._update() + + def skip(self): + if self._state is not _State.PREPARE_POST: + return + + self._ready = False + self._update() + + def exit(self): + "Call by the manager" + self._sigexit.set() + + @property + def state(self): + if self._ready: + return self.State.READY + return self._state + + async def wait_prepare_pre(self): + await self._switch.wait() + if self._state is not _State.PREPARE_PRE: + raise RuntimeError(f"expected {self.State.PREPARE_PRE}, got {self._state}") + + self._switch.clear() + self._update() + + async def wait_cleanup_pre(self): + await self._switch.wait() + if self._state is not _State.CLEANUP_PRE: + raise RuntimeError(f"expected {self.State.CLEANUP_PRE}, got {self._state}") + + self._switch.clear() + self._update() + + async def wait_prepare_post(self): + await self._switch.wait() + if self._state is not _State.PREPARE_POST: + raise RuntimeError(f"expected {self.State.PREPARE_POST}, got {self._state}") + + self._switch.clear() + + async def wait_cleanup_post(self): + await self._switch.wait() + if self._state is not _State.CLEANUP_POST: + raise RuntimeError(f"expected {self.State.CLEANUP_POST}, got {self._state}") + + self._switch.clear() + + async def wait_for_sigexit(self): + await self._sigexit.wait() + + @property + def ready(self): + if self._ready is None: + raise RuntimeError("ServiceContext.ready is not available outside of prepare context") + + return self._ready + + @property + def should_exit(self): + return self._sigexit.is_set() + + @asynccontextmanager + async def prepare(self): + self._state = _State.PREPARE_PRE + self.switch() + await self._notify.wait() + self._notify.clear() + yield + self._state = _State.PREPARE_POST + self.switch() + await self._notify.wait() + self._notify.clear() + self._state = None + + @asynccontextmanager + async def cleanup(self): + self._state = _State.CLEANUP_PRE + self.switch() + await self._notify.wait() + self._notify.clear() + yield + self._state = _State.CLEANUP_POST + self.switch() + await self._notify.wait() + self._notify.clear() + self._state = None \ No newline at end of file diff --git a/_bootstrap/core.py b/_bootstrap/core.py new file mode 100644 index 0000000..3cd9df5 --- /dev/null +++ b/_bootstrap/core.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import asyncio +import signal +from contextvars import ContextVar +from typing import TYPE_CHECKING, Any, Iterable + +from exceptiongroup import ExceptionGroup # noqa: A004 +from loguru import logger + +from .context import ServiceContext +from .graph import ServiceGraph +from .utiles import TaskGroup, cvar, oneof, cancel_alive_tasks + + +if TYPE_CHECKING: + from .service import Service + + +class UnhandledExit(Exception): + pass + + +BOOTSTRAP_CONTEXT: ContextVar[Bootstrap] = ContextVar("BOOTSTRAP_CONTEXT") + + +class Bootstrap: + graph: ServiceGraph + + def __init__(self): + self.graph = ServiceGraph() + + async def spawn(self, *services: Service): + service_bind, previous, nexts = self.graph.subgraph(*services) + tasks: dict[str, asyncio.Task] = self.graph.tasks + + prepare_errors: list[Exception] = [] + cleanup_errors: list[Exception] = [] + + done_prepare: dict[str, None] = {} + pending_prepare = TaskGroup() + pending_cleanup = TaskGroup() + queued_prepare = {k: v.copy() for k, v in previous.maps[0].items()} + queued_cleanup = {k: v.copy() for k, v in nexts.maps[0].items()} + + spawn_forward_prepare: bool = True + + def spawn_prepare(service: Service): + async def prepare_guard(): + nonlocal spawn_forward_prepare + + context = ServiceContext(self) + self.graph.contexts[service.id] = context + task = tasks[service.id] = asyncio.create_task(service.launch(context)) + + await oneof(context.wait_prepare_pre(), task) + await oneof(context.wait_prepare_post(), task) + + if task.done(): + spawn_forward_prepare = False + + prepare_errors.append(task.exception() or UnhandledExit()) # type: ignore + self.graph.drop(service) + return + + done_prepare[service.id] = None + + if not spawn_forward_prepare: + return + + for next_service, barriers in list(queued_prepare.items()): + if service.id in barriers: + barriers.pop(service.id) + + if not barriers: + spawn_prepare(service_bind[next_service]) + queued_prepare.pop(next_service) + + pending_prepare.spawn(prepare_guard()) + + def spawn_cleanup(service: Service): + async def cleanup_guard(): + context = self.graph.contexts[service.id] + task = tasks[service.id] + + context.exit() + await oneof(context.wait_cleanup_pre(), task) + await oneof(context.wait_cleanup_post(), task) + + self.graph.drop(service) + + if task.done(): + cleanup_errors.append(task.exception() or UnhandledExit()) # type: ignore + return + + for previous_service, barriers in list(queued_cleanup.items()): + if service.id in barriers: + barriers.pop(service.id) + + if not barriers: + spawn_cleanup(service_bind[previous_service]) + queued_cleanup.pop(previous_service) + + pending_cleanup.spawn(cleanup_guard()) + + def toggle_enter(): + for i in done_prepare: + self.graph.contexts[i].enter() + + def toggle_skip(): + for i in done_prepare: + self.graph.contexts[i].skip() + + def rollback(): + spawned = False + + for i in done_prepare: + if not (nexts[i] & done_prepare.keys()): + spawned = True + spawn_cleanup(service_bind[i]) + + if not spawned: + raise RuntimeError("Unsatisfied dependencies, rollback failed") + + return pending_cleanup.wait() + + for i, v in previous.maps[0].items(): + if not v: + spawn_prepare(service_bind[i]) + queued_prepare.pop(i) + + await pending_prepare + + if queued_prepare: + toggle_skip() + await rollback() + + if cleanup_errors: + raise RuntimeError("Unsatisfied dependencies") from ExceptionGroup("", cleanup_errors) + + raise RuntimeError("Unsatisfied dependencies") + + if prepare_errors: + toggle_skip() + await rollback() + + if cleanup_errors: + raise ExceptionGroup("", cleanup_errors) from ExceptionGroup("", prepare_errors) + + raise ExceptionGroup("", prepare_errors) + + self.graph.apply(dict(service_bind), previous, nexts) + toggle_enter() + + return rollback + + async def launch(self, *services: Service): + rollback = await self.spawn(*services) + try: + await asyncio.gather(*[self.graph.contexts[i.id]._switch.wait() for i in services]) + except asyncio.CancelledError: + pass + finally: + await rollback() + + def launch_blocking( + self, + *services: Service, + loop: asyncio.AbstractEventLoop | None = None, + stop_signal: Iterable[signal.Signals] = (signal.SIGINT,), + ): + import contextlib + import threading + + loop = asyncio.new_event_loop() + + logger.info("Starting launart main task...", style="green bold") + + with cvar(BOOTSTRAP_CONTEXT, self): + launch_task = loop.create_task(self.launch(*services), name="amnesia-launch") + + handled_signals: dict[signal.Signals, Any] = {} + + def signal_handler(*args, **kwargs): # noqa: ARG001 + for service in self.graph.services: + self.graph.contexts[service].exit() + + if not launch_task.done(): + launch_task.cancel() + # wakeup loop if it is blocked by select() with long timeout + launch_task.get_loop().call_soon_threadsafe(lambda: None) + logger.warning("Ctrl-C triggered by user.", style="dark_orange bold") + + if threading.current_thread() is threading.main_thread(): # pragma: worst case + try: + for sig in stop_signal: + handled_signals[sig] = signal.getsignal(sig) + signal.signal(sig, signal_handler) + except ValueError: # pragma: no cover + # `signal.signal` may throw if `threading.main_thread` does + # not support signals + handled_signals.clear() + + loop.run_until_complete(launch_task) + + for sig, handler in handled_signals.items(): + if signal.getsignal(sig) is signal_handler: + signal.signal(sig, handler) + + try: + cancel_alive_tasks(loop) + loop.run_until_complete(loop.shutdown_asyncgens()) + with contextlib.suppress(RuntimeError, AttributeError): + # LINK: https://docs.python.org/3.10/library/asyncio-eventloop.html#asyncio.loop.shutdown_default_executor + loop.run_until_complete(loop.shutdown_default_executor()) # type: ignore + finally: + asyncio.set_event_loop(None) + logger.success("asyncio shutdown complete.", style="green bold") \ No newline at end of file diff --git a/_bootstrap/graph.py b/_bootstrap/graph.py new file mode 100644 index 0000000..42dae33 --- /dev/null +++ b/_bootstrap/graph.py @@ -0,0 +1,93 @@ + +from __future__ import annotations + +from collections import ChainMap +from typing import TYPE_CHECKING, Mapping, TypeVar +from typing_extensions import TypeAlias + +if TYPE_CHECKING: + import asyncio + + from .context import ServiceContext + from .service import Service + +T = TypeVar("T") + +_Set: TypeAlias = "dict[T, None]" + + +class ServiceGraph: + services: dict[str, Service] + contexts: dict[str, ServiceContext] + tasks: dict[str, asyncio.Task] + + _previous: dict[str, _Set[str]] + _next: dict[str, _Set[str]] + + def __init__(self): + self.services = {} + self.contexts = {} + self.tasks = {} + + self._previous = {} + self._next = {} + + def subgraph(self, *services: Service): + services_: dict[str, Service] = {i.id: i for i in services} + + if services_.keys() & self.services.keys(): + raise ValueError("Service id conflict.") + + _services: Mapping[str, Service] = ChainMap(services_, self.services) + _previous: ChainMap[str, _Set[str]] = ChainMap({}, self._previous) + _next: ChainMap[str, _Set[str]] = ChainMap({}, self._next) + + # _previous: dict[str, _Set[str]] = {} + # _next: dict[str, _Set[str]] = {} + + for i in services: + _previous[i.id] = dict.fromkeys(i.after) + _next[i.id] = dict.fromkeys(i.before) + + for p in i.after: + if p not in _services: + raise ValueError(f"Service {i.id} after {p} not found.") + + if p in _next: + t = _next[p] + else: + t = _next[p] = {} + + t[i.id] = None + + for n in i.before: + if n not in _services: + raise ValueError(f"Service {i.id} before {n} not found.") + + if n in _previous: + t = _previous[n] + else: + t = _previous[n] = {} + + t[i.id] = None + + return _services.maps[0], _previous, _next + + def apply(self, service_bind: dict[str, Service], previous: ChainMap[str, _Set[str]], nexts: ChainMap[str, _Set[str]]): + self.services.update(service_bind) + self._previous.update(previous) + self._next.update(nexts) + + def drop(self, service: Service): + self.services.pop(service.id, None) + self.contexts.pop(service.id, None) + self.tasks.pop(service.id, None) + + self._previous.pop(service.id, None) + self._next.pop(service.id, None) + + for i in self._previous: + self._previous[i].pop(service.id, None) + + for i in self._next: + self._next[i].pop(service.id, None) diff --git a/_bootstrap/service.py b/_bootstrap/service.py new file mode 100644 index 0000000..9e26eb0 --- /dev/null +++ b/_bootstrap/service.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .context import ServiceContext + + +class Service: + id: str + + @property + def before(self) -> tuple[str, ...]: + return () + + @property + def after(self) -> tuple[str, ...]: + return () + + async def launch(self, context: ServiceContext): + async with context.prepare(): + pass + + if context.ready: + ... + + async with context.cleanup(): + pass \ No newline at end of file diff --git a/_bootstrap/utiles.py b/_bootstrap/utiles.py new file mode 100644 index 0000000..a91a49e --- /dev/null +++ b/_bootstrap/utiles.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +import asyncio +from contextlib import contextmanager +from typing import TYPE_CHECKING, Coroutine, Iterable, TypeVar + +from loguru import logger +from typing_extensions import TypeAlias + +if TYPE_CHECKING: + from contextvars import ContextVar + + +_CoroutineLike: TypeAlias = "Coroutine | asyncio.Task" + + +def into_tasks(awaitables: Iterable[_CoroutineLike]) -> list[asyncio.Task]: + return [i if isinstance(i, asyncio.Task) else asyncio.create_task(i) for i in awaitables] + + +async def unity( + tasks: Iterable[_CoroutineLike], + *, + timeout: float | None = None, # noqa: ASYNC109 + return_when: str = asyncio.ALL_COMPLETED, +): + return await asyncio.wait(into_tasks(tasks), timeout=timeout, return_when=return_when) + + +async def any_completed(tasks: Iterable[_CoroutineLike]): + done, pending = await unity(tasks, return_when=asyncio.FIRST_COMPLETED) + return next(iter(done)), pending + + +async def oneof(*tasks: _CoroutineLike): + return await any_completed(tasks) + + +def cancel_alive_tasks(loop: asyncio.AbstractEventLoop): + to_cancel = asyncio.tasks.all_tasks(loop) + if to_cancel: + for tsk in to_cancel: + tsk.cancel() + loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) + + for task in to_cancel: # pragma: no cover + # BELIEVE IN PSF + if task.cancelled(): + continue + if task.exception() is not None: + logger.opt(exception=task.exception()).error(f"Unhandled exception when shutting down {task}:") + + +T = TypeVar("T") + + +@contextmanager +def cvar(ctx: ContextVar[T], val: T): + token = ctx.set(val) + try: + yield val + finally: + ctx.reset(token) + + +class TaskGroup: + tasks: list[asyncio.Task] + main: asyncio.Task | None = None + _stop: bool = False + _notify: asyncio.Event + + def __init__(self): + self.tasks = [] + self._notify = asyncio.Event() + + def flush(self): + if self.main is not None: + self._notify.set() + + def stop(self): + self._stop = True + self.flush() + + def spawn(self, task: asyncio.Task | Coroutine): + task = asyncio.create_task(task) if asyncio.iscoroutine(task) else task + self.tasks.append(task) + + self.flush() + return task + + def update(self, tasks: Iterable[asyncio.Task | Coroutine]): + tasks = [asyncio.create_task(task) if asyncio.iscoroutine(task) else task for task in tasks] + self.tasks.extend(tasks) + + self.flush() + return tasks + + def drop(self, tasks: Iterable[asyncio.Task]): + for task in tasks: + self.tasks.remove(task) + + self.flush() + + async def wait(self): + while True: + if not self.tasks: + await self._notify.wait() + self._notify.clear() + + self.main = asyncio.create_task(asyncio.wait(self.tasks)) + awaiting_notify = asyncio.create_task(self._notify.wait()) + + await asyncio.wait([self.main, awaiting_notify], return_when=asyncio.FIRST_COMPLETED) + + if awaiting_notify.done(): + self._notify.clear() + if self._stop: + break + + continue + + await self.main + return + + async def __aenter__(self): + pass + + async def __aexit__(self, exc_type, exc_val, exc_tb): + self.stop() + await self.wait() + + def __await__(self): + return self.wait().__await__() diff --git a/launart/__init__.py b/launart/__init__.py index 3c960f8..1bed045 100644 --- a/launart/__init__.py +++ b/launart/__init__.py @@ -1,11 +1,9 @@ from contextlib import suppress as _suppress +from _bootstrap.utiles import any_completed as any_completed # noqa: F401 + from .manager import Launart as Launart from .service import Service as Service -from .service import U_Stage as U_Stage -from .status import ServiceStatus as ServiceStatus -from .utilles import RequirementResolveFailed as RequirementResolveFailed -from .utilles import any_completed as any_completed with _suppress(ImportError, ModuleNotFoundError): from .saya import LaunartBehaviour as LaunartBehaviour diff --git a/launart/_patch.py b/launart/_patch.py new file mode 100644 index 0000000..6adba7e --- /dev/null +++ b/launart/_patch.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from collections.abc import Awaitable +from typing import TYPE_CHECKING, Callable + +if TYPE_CHECKING: + from launart.manager import Launart + from launart.service import Service + + +def patch_launch(serv: Service) -> Callable[[Launart], Awaitable[None]]: + if serv.stages == {"preparing", "blocking", "cleanup"} or serv.stages == {"preparing", "cleanup"}: + return serv.launch + elif not serv.stages: + + async def _launch(manager: Launart): + await serv.launch(manager) + async with serv.stage("preparing"): + pass + async with serv.stage("blocking"): + pass + async with serv.stage("cleanup"): + pass + + return _launch + elif serv.stages == {"preparing", "blocking"}: + + async def _launch(manager: Launart): + await serv.launch(manager) + async with serv.stage("cleanup"): + pass + + return _launch + elif serv.stages == {"blocking", "cleanup"}: + + async def _launch(manager: Launart): + async with serv.stage("preparing"): + pass + await serv.launch(manager) + + return _launch + elif serv.stages == {"preparing"}: + + async def _launch(manager: Launart): + await serv.launch(manager) + async with serv.stage("cleanup"): + pass + + return _launch + elif serv.stages == {"blocking"}: + + async def _launch(manager: Launart): + async with serv.stage("preparing"): + pass + await serv.launch(manager) + async with serv.stage("cleanup"): + pass + + return _launch + else: # serv.stages == {"cleanup"} + + async def _launch(manager: Launart): + async with serv.stage("preparing"): + pass + await serv.launch(manager) + + return _launch diff --git a/launart/_sideload.py b/launart/_sideload.py deleted file mode 100644 index 2107048..0000000 --- a/launart/_sideload.py +++ /dev/null @@ -1,40 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Dict, TypeVar, cast - -if TYPE_CHECKING: - from launart.status import U_ManagerStage - - -class Override: - def __init__(self, source: Any, additional: Dict[str, Any]): - self.__source = source - self.__additional = additional - - @property - def source(self): - return self.__source - - def __getattr__(self, item): - if item in self.__additional: - return self.__additional[item] - return getattr(self.__source, item) - - -T = TypeVar("T") - - -def override(source: T, additional: Dict[str, Any]) -> T: - return cast(T, Override(source, additional)) - - -class FutureMark: - id: str - stage: U_ManagerStage | None = None - - def __init__(self, id: str, stage: U_ManagerStage | None = None) -> None: - self.id = id - self.stage = stage - - def __call__(self, _) -> Any: - pass diff --git a/launart/manager.py b/launart/manager.py index c9cd3ae..586e920 100644 --- a/launart/manager.py +++ b/launart/manager.py @@ -1,228 +1,54 @@ from __future__ import annotations import asyncio -import contextlib -import contextvars import signal from contextvars import ContextVar -from functools import partial -from itertools import chain -from typing import TYPE_CHECKING, Any, ClassVar, Coroutine, Dict, Iterable, Optional, TypeVar, cast, overload +from collections.abc import Coroutine +from typing import Any, ClassVar, Callable, Iterable, TypeVar, cast, overload from loguru import logger -from launart._sideload import override -from launart.service import Service -from launart.status import ManagerStatus -from launart.utilles import ( - FlexibleTaskGroup, - any_completed, - cancel_alive_tasks, - resolve_requirements, -) +from _bootstrap import Bootstrap +from _bootstrap import Service as _Service +from _bootstrap.utiles import cancel_alive_tasks, cvar +from launart.service import Service, make_service + +from .status import Status as ManagerStatus T = TypeVar("T") TL = TypeVar("TL", bound=Service) class Launart: - components: Dict[str, Service] status: ManagerStatus - tasks: dict[str, asyncio.Task] - task_group: Optional[FlexibleTaskGroup] = None - _context: ClassVar[ContextVar[Launart]] = ContextVar("launart._context") - _default_isolate: dict[Any, Any] def __init__(self): - self.components = {} - self.tasks = {} - self.status = ManagerStatus() - self._default_isolate = { - "interface_provide": {} - } + self._core = Bootstrap() + self._rollbacks: dict[str, Callable[[], Coroutine[Any, Any, None]]] = {} + self._running = False + self._default_isolate = {"interface_provide": {}} + self._initial_services: dict[str, Service] = {} @classmethod def current(cls) -> Launart: return cls._context.get() def export_interface(self, interface: type, service: Service): - self._default_isolate['interface_provide'][interface] = service - - async def _sideload_tracker(self, component: Service) -> None: - if TYPE_CHECKING: - assert self.task_group is not None - - logger.info(f"Sideload {component.id}: injecting") - - local_status = ManagerStatus() - shallow_self = cast(Launart, override(self, {"status": local_status})) - component.manager = shallow_self - - task = asyncio.create_task(component.launch(shallow_self)) - task.add_done_callback(partial(self._on_task_done, component)) - self.tasks[component.id] = task - - local_status.stage = "preparing" - if "preparing" in component.stages: - await self._sideload_prepare(component) - - with contextlib.suppress(asyncio.CancelledError): - local_status.stage = "blocking" - if "blocking" in component.stages: - await self._sideload_blocking(component) - - local_status.update_multi( - { - ManagerStatus.stage: "cleaning", - ManagerStatus.exiting: True, - } - ) - if "cleanup" in component.stages: - await self._sideload_cleanup(component) - - if not task.done() or not task.cancelled(): # pragma: worst case - await task - logger.info(f"Sideload {component.id}: completed.") - del self.tasks[component.id] - del self.components[component.id] - del self.task_group.sideload_trackers[component.id] - - async def _sideload_prepare(self, component: Service) -> None: - if component.status.stage != "waiting-for-prepare": # pragma: worst case - logger.info(f"Waiting sideload {component.id} for prepare") - await any_completed( - self.tasks[component.id], - component.status.wait_for("waiting-for-prepare"), - ) - - logger.info(f"Sideload {component.id}: preparing") - - component.status.stage = "preparing" - await any_completed( - self.tasks[component.id], - component.status.wait_for("prepared"), - ) - logger.info(f"Sideload {component.id}: preparation completed") - - async def _sideload_blocking(self, component: Service) -> None: - logger.info(f"Sideload {component.id}: start blocking") - - await any_completed( - self.tasks[component.id], - component.status.wait_for("blocking-completed"), - ) - logger.info(f"Sideload {component.id}: blocking completed") - - async def _sideload_cleanup(self, component: Service): - if component.status.stage != "waiting-for-cleanup": # pragma: worst case - await any_completed( - self.tasks[component.id], - component.status.wait_for("waiting-for-cleanup"), - ) - - component.status.stage = "cleanup" - - await any_completed( - self.tasks[component.id], - component.status.wait_for("finished"), - ) - logger.info(f"Sideload {component.id}: cleanup completed.") - - def _on_task_done(self, component: Service, t: asyncio.Task): - try: - exc = t.exception() - except asyncio.CancelledError: - logger.warning( - f"[{component.id}] was cancelled in abort.", - alt=f"[yellow bold]Component [magenta]{component.id}[/] was cancelled in abort.", - ) - return - if exc: - logger.opt(exception=exc).error( - f"[{component.id}] raised a exception.", - alt=f"[red bold]Component [magenta]{component.id}[/] raised an exception.", - ) - return - - if self.status.preparing: - if "preparing" in component.stages: - if component.status.prepared: - logger.info(f"Component {component.id} completed preparation.") - else: - logger.error(f"Component {component.id} exited before preparation.") - elif self.status.blocking: - if "cleanup" in component.stages and component.status.stage != "finished": - logger.warning(f"Component {component.id} exited without cleanup.") - else: - logger.success(f"Component {component.id} finished.") - elif self.status.cleaning: - if "cleanup" in component.stages: - if component.status.finished: - logger.success(f"Component {component.id} finished.") - else: - logger.warning(f"Component {component.id} exited before completing cleanup.") - - logger.info( - f"Component {component.id} completed.", - alt=rf"[green]Component [magenta]{component.id}[/magenta] completed.", - ) - - # clean interface - - for k, v in list(self._default_isolate['interface_provide'].items()): - if v is component: - del self._default_isolate['interface_provide'][k] - - async def _component_prepare(self, task: asyncio.Task, component: Service): - if component.status.stage != "waiting-for-prepare": # pragma: worst case - logger.info(f"Wait component {component.id} into preparing.") - await any_completed(task, component.status.wait_for("waiting-for-prepare")) - - logger.info(f"Component {component.id} is preparing.") - component.status.stage = "preparing" - - loop = asyncio.get_running_loop() - listener_task = loop.create_task(component.status.wait_for("prepared")) - done, pending = await any_completed(task, listener_task) - if listener_task in pending: - exc = task.exception() - if exc is not None: - task.result() - - logger.success(f"Component {component.id} is prepared.") - - async def _component_cleanup(self, task: asyncio.Task, component: Service): - if component.status.stage != "waiting-for-cleanup": - logger.info(f"Wait component {component.id} into cleanup.") - await any_completed(task, component.status.wait_for("waiting-for-cleanup")) - - logger.info(f"Component {component.id} enter cleanup phase.") - component.status.stage = "cleanup" - - await any_completed(task, component.status.wait_for("finished")) + self._default_isolate["interface_provide"][interface] = service def add_component(self, component: Service): - if self.task_group is not None: - resolve_requirements([*self.components.values(), component]) - - component.ensure_manager(self) - - if component.id in self.components: - raise ValueError(f"Service {component.id} already exists.") - - if self.task_group is not None: - tracker = asyncio.create_task(self._sideload_tracker(component)) - self.task_group.sideload_trackers[component.id] = tracker - self.task_group.add(tracker) # flush the waiter tasks + if not self._running: + self._initial_services[component.id] = component + return - self.components[component.id] = component + _t = asyncio.create_task(self._core.spawn(make_service(component))) + _t.add_done_callback(lambda _: self._rollbacks.update({component.id: _t.result()})) - get_interface = getattr(component, 'get_interface', None) - supported_interface_types = getattr(component, 'supported_interface_types', None) - if get_interface is not None and supported_interface_types is not None: - for interface_type in supported_interface_types: - self.export_interface(interface_type, component) + async def add_sideload(self, component: Service): + if not self._running: + raise ValueError("Cannot add a service while the launart is not running.") + self._rollbacks[component.id] = await self._core.spawn(make_service(component)) @overload def get_component(self, target: type[TL]) -> TL: @@ -233,212 +59,63 @@ def get_component(self, target: str) -> Service: ... def get_component(self, target: str | type[TL]) -> TL | Service: - if isinstance(target, str): - if target not in self.components: - raise ValueError(f"Service {target} does not exists.") - return self.components[target] - - if _id := getattr(target, "id", None): - return self.get_component(_id) - try: - return next(comp for comp in self.components.values() if isinstance(comp, target)) - except StopIteration as e: - raise ValueError(f"Service {target.__name__} does not exists.") from e + _id = target if isinstance(target, str) else target.id + _serv = self._core.graph.services[_id] + except KeyError: + raise ValueError(f"Service {target} does not exists.") from None + if not hasattr(_serv, "__launart_service__"): + raise ValueError(f"Service {target} does not exists.") + return cast("TL | Service", _serv.__launart_service__) # type: ignore def remove_component( self, component: str | Service, ): - if isinstance(component, str): - if component not in self.components: - if self.task_group and component in self.task_group.sideload_trackers: - # sideload tracking, cannot gracefully remove (into exiting phase) - return - raise ValueError(f"Service {component} does not exist.") - target = self.components[component] - else: - if component not in self.components.values(): - raise ValueError(f"Service {component.id} does not exist.") - - target = component - - if self.task_group is None: - del self.components[target.id] - return - - resolve_requirements([service for service in self.components.values() if service.id != target.id]) - - if target.id not in self.task_group.sideload_trackers: - raise RuntimeError("Only sideload tasks can be removed at runtime!") - - tracker = self.task_group.sideload_trackers[target.id] - if tracker.cancelled() or tracker.done(): # completed in silence, let it pass + serv_id = component if isinstance(component, str) else component.id + if not self._running: + if serv_id not in self._initial_services: + raise ValueError(f"Service {serv_id} does not exists.") + self._initial_services.pop(serv_id) return - - if target.status.stage not in {"prepared", "blocking", "blocking-completed", "waiting-for-cleanup"}: - raise RuntimeError( - f"{target.id} obtains invalid stats to sideload active release, it's {target.status.stage}" - ) - - tracker.cancel() # trigger cancel, and the tracker will start clean up + if serv_id not in self._rollbacks: + raise ValueError(f"Service {serv_id} cannot be removed.") + rollback = self._rollbacks.pop(serv_id) + asyncio.create_task(rollback()) + + async def remove_sideload(self, component: str | Service): + serv_id = component if isinstance(component, str) else component.id + if serv_id not in self._rollbacks: + raise ValueError(f"Service {serv_id} cannot be removed.") + rollback = self._rollbacks.pop(serv_id) + await rollback() def get_interface(self, interface_type: type[T]) -> T: - provider_map = self._default_isolate['interface_provide'] + provider_map = self._default_isolate["interface_provide"] service = provider_map.get(interface_type) if service is None: raise ValueError(f"{interface_type} is not supported.") return service.get_interface(interface_type) - async def _lifespan_finale(self, token: contextvars.Token): - finale_tasks = [i for i in self.tasks.values() if not i.done()] - if finale_tasks: - await asyncio.wait(finale_tasks) - - self.task_group = None - self._context.reset(token) - self.status = ManagerStatus() - - async def _tasks_controler(self, tasks: list[Coroutine]): - # 返回指示符。 - loop = asyncio.get_running_loop() - done, pending = await asyncio.wait(map(loop.create_task, tasks), return_when=asyncio.FIRST_EXCEPTION) - for i in done: - if i.exception() is not None: - return "exception-thrown", pending - - return "it-looks-good", done - - async def _fatal_cancel(self, token: contextvars.Token): - for task_to_cancel in self.tasks.values(): - task_to_cancel.cancel() - - await self._lifespan_finale(token) - return - async def launch(self): - if self.status.stage is not None: - logger.error("Incorrect ownership, launart is already running.") - return - - _token = self._context.set(self) - - loop = asyncio.get_running_loop() - post = loop.create_task - - self.tasks = {} - self.task_group = FlexibleTaskGroup() - - for _id, component in self.components.items(): - t = post(component.launch(self)) - t.add_done_callback(partial(self._on_task_done, component)) - self.tasks[_id] = t - # self.task_group.add(self.tasks[k]) - # NOTE: 遗憾的, 我们仍然需要通过这种打洞的方式来实现, 而不是给每个 component 发一个保姆. - - self.status.stage = "preparing" - - try: - prepared_tasks = [] - - fatal_flag = False - for components in resolve_requirements(self.components.values()): - preparing_tasks = [ - self._component_prepare(self.tasks[component.id], component) - for component in components - if "preparing" in component.stages - ] - if fatal_flag: - for component in components: - self.tasks[component.id].cancel() - - if preparing_tasks: - sym, tasks = await self._tasks_controler(preparing_tasks) - if sym == "exception-thrown": - for task_to_cancel in chain(tasks, prepared_tasks): - task_to_cancel.cancel() - fatal_flag = True - - prepared_tasks.extend(tasks) - - if fatal_flag: - await self._lifespan_finale(_token) - return - - except asyncio.CancelledError: - await self._fatal_cancel(_token) - return - - self.status.stage = "blocking" - - blocking_tasks = [ - any_completed(self.tasks[component.id], component.status.wait_for("blocking-completed")) - for component in self.components.values() - if "blocking" in component.stages - ] - - try: - if blocking_tasks: - self.task_group.add(*blocking_tasks) - await self.task_group - finally: - self.status.exiting = True - - logger.info("Entering cleanup phase.", style="yellow bold") - - try: - # cleanup the dangling sideload tasks first. - if self.task_group.sideload_trackers: - for tracker in self.task_group.sideload_trackers.values(): - tracker.cancel() - await asyncio.wait(self.task_group.sideload_trackers.values()) - - self.status.stage = "cleaning" - for idt, component in self.components.items(): - if "cleanup" in component.stages and component.status.stage != "waiting-for-cleanup": - await any_completed( - self.tasks[idt], - component.status.wait_for("waiting-for-cleanup"), - ) - - exceptional_tasks = [] - for components in resolve_requirements(self.components.values(), reverse=True): - cleanup_tasks = [ - self._component_cleanup(self.tasks[component.id], component) - for component in components - if "cleanup" in component.stages - ] - if cleanup_tasks: - sym, tasks = await self._tasks_controler(cleanup_tasks) - if sym == "it-looks-good": - continue - - exceptional_tasks.extend(exceptional_tasks) - except asyncio.CancelledError: - await self._fatal_cancel(_token) - return - - self.status.stage = "finished" - - for task in exceptional_tasks: - ... - - logger.success("Lifespan finished, waiting for finalization.", style="green bold") - await self._lifespan_finale(_token) - - logger.success("Launart finished.", style="green bold") + self._running = True + srvs = [make_service(s) for s in self._initial_services.values()] + with cvar(self._context, self): + await self._core.launch(*srvs) + self._running = False + return def launch_blocking( self, *, - loop: Optional[asyncio.AbstractEventLoop] = None, + loop: asyncio.AbstractEventLoop | None = None, stop_signal: Iterable[signal.Signals] = (signal.SIGINT,), ): - from creart import it import contextlib - import functools import threading + from creart import it + if loop is not None: # pragma: no cover from warnings import warn @@ -453,8 +130,18 @@ def launch_blocking( logger.info("Starting launart main task...", style="green bold") launch_task = loop.create_task(self.launch(), name="amnesia-launch") - handled_signals: Dict[signal.Signals, Any] = {} - signal_handler = functools.partial(self._on_sys_signal, main_task=launch_task) + handled_signals: dict[signal.Signals, Any] = {} + + def signal_handler(*_): + for service in self._core.graph.services: + self._core.graph.contexts[service].exit() + + if not launch_task.done(): + launch_task.cancel() + # wakeup loop if it is blocked by select() with long timeout + launch_task.get_loop().call_soon_threadsafe(lambda: None) + logger.warning("Ctrl-C triggered by user.", style="dark_orange bold") + if threading.current_thread() is threading.main_thread(): # pragma: worst case try: for sig in stop_signal: @@ -480,17 +167,3 @@ def launch_blocking( finally: asyncio.set_event_loop(None) logger.success("asyncio shutdown complete.", style="green bold") - - def _on_sys_signal(self, _, __, main_task: asyncio.Task): - self.status.exiting = True - - if self.task_group is not None: - self.task_group.stop = True - if self.task_group.blocking_task is not None: # pragma: worst case - self.task_group.blocking_task.cancel() - - if not main_task.done(): - main_task.cancel() - # wakeup loop if it is blocked by select() with long timeout - main_task._loop.call_soon_threadsafe(lambda: None) - logger.warning("Ctrl-C triggered by user.", style="dark_orange bold") diff --git a/launart/ryanvk/__init__.py b/launart/ryanvk/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/launart/saya.py b/launart/saya.py index f69dfa2..ea719a2 100644 --- a/launart/saya.py +++ b/launart/saya.py @@ -5,6 +5,7 @@ from graia.saya.behaviour import Behaviour from graia.saya.cube import Cube from graia.saya.schema import BaseSchema + from launart import Launart, Service diff --git a/launart/service.py b/launart/service.py index ee3f499..7d5811a 100644 --- a/launart/service.py +++ b/launart/service.py @@ -1,11 +1,15 @@ from __future__ import annotations from abc import ABCMeta, abstractmethod +from typing import TYPE_CHECKING, ClassVar, Literal, Optional, Set from contextlib import asynccontextmanager -from typing import TYPE_CHECKING, Literal, Optional, Set -from launart.status import STAGE_STAT, STATS, Phase, ServiceStatus, U_Stage -from launart.utilles import any_completed +from _bootstrap.context import ServiceContext +from _bootstrap.service import Service as BaseService + +from .status import Status +from .util import override +from ._patch import patch_launch if TYPE_CHECKING: from launart.manager import Launart @@ -13,11 +17,8 @@ class Service(metaclass=ABCMeta): id: str - status: ServiceStatus manager: Optional[Launart] = None - - def __init__(self) -> None: - self.status = ServiceStatus() + _context: Optional[ServiceContext] = None @property @abstractmethod @@ -26,62 +27,70 @@ def required(self) -> Set[str]: @property @abstractmethod - def stages(self) -> Set[Phase]: + def stages(self) -> Set[Literal["preparing", "blocking", "cleanup"]]: ... + @property + def context(self) -> ServiceContext: + if self._context is None: + raise RuntimeError("this component does not have a context yet.") + return self._context + + @property + def status(self) -> Status: + return Status(self.context) + def ensure_manager(self, manager: Launart): if self.manager is not None and self.manager is not manager: raise RuntimeError("this component attempted to be mistaken a wrong ownership of launart/manager.") self.manager = manager - @asynccontextmanager - async def stage(self, stage: Literal["preparing", "blocking", "cleanup"]): - if self.manager is None: - raise RuntimeError("attempted to set stage of a component without a manager.") - if self.manager.status.stage is None: - raise LookupError("attempted to set stage of a component without a current manager") - if stage not in self.stages: - raise ValueError(f"undefined and unexpected stage entering: {stage}") + def _ensure_context(self, context: ServiceContext): + if self._context is not None and self._context is not context: + raise RuntimeError("this component attempted to be mistaken a wrong context.") + self._context = context + def stage(self, stage: Literal["preparing", "blocking", "cleanup"]): + if self._context is None: + raise RuntimeError("attempted to set stage of a component without a context.") + if stage not in {"preparing", "blocking", "cleanup"}: + raise ValueError(f"undefined and unexpected stage entering: {stage}") + ctx = self._context if stage == "preparing": - if "waiting-for-prepare" not in STAGE_STAT[self.status.stage]: - raise ValueError(f"unexpected stage entering: {self.status.stage} -> waiting-for-prepare") - await self.manager.status.wait_for_preparing() - self.status.stage = "waiting-for-prepare" - await self.status.wait_for("preparing") - yield - self.status.stage = "prepared" + return ctx.prepare() elif stage == "blocking": - if "blocking" not in STAGE_STAT[self.status.stage]: - raise ValueError(f"unexpected stage entering: {self.status.stage} -> blocking") - await self.manager.status.wait_for_blocking() - await self.wait_for_required() - self.status.stage = "blocking" - yield - self.status.stage = "blocking-completed" + @asynccontextmanager + async def _blocking(): + await self.status.wait_for_blocking() + yield + + return _blocking() elif stage == "cleanup": - if "waiting-for-cleanup" not in STAGE_STAT[self.status.stage]: - raise ValueError(f"unexpected stage entering: {self.status.stage} -> waiting-for-cleanup") - await self.manager.status.wait_for_cleaning(current=self.id) - self.status.stage = "waiting-for-cleanup" - await self.status.wait_for("cleanup") - yield - self.status.stage = "finished" + return ctx.cleanup() else: raise ValueError(f"entering unexpected stage: {stage}(unknown definition)") - async def wait_for_required(self, stage: U_Stage = "prepared"): - await self.wait_for(stage, *self.required) - - async def wait_for(self, stage: U_Stage, *component_id: str | type[Service]): - if self.manager is None: - raise RuntimeError("attempted to wait for some components without a manager.") - components = [self.manager.get_component(id) for id in component_id] - while any(component.status.stage not in STATS[STATS.index(stage) :] for component in components): - await any_completed( - *[component.status.wait_for_update() for component in components if component.status.stage != stage] - ) - - @abstractmethod async def launch(self, manager: Launart): pass + + +def make_service(serv: Service) -> BaseService: + from launart.manager import Launart + + launch = patch_launch(serv) + + class _Service(BaseService): + id = serv.id + __launart_service__: ClassVar[Service] = serv + + @property + def after(self): + return tuple(serv.required) + + async def launch(self, context: ServiceContext): + serv._ensure_context(context) + manager = Launart.current() + await launch(manager)#override(manager, {"status": Status(context)})) + + b_s = type(serv.__class__.__name__, (_Service,), {})() + return b_s diff --git a/launart/status.py b/launart/status.py index 6d9cbef..f43d783 100644 --- a/launart/status.py +++ b/launart/status.py @@ -1,133 +1,50 @@ -from __future__ import annotations +from _bootstrap.context import ServiceContext, _State -import asyncio -from typing import Literal, Optional, Union -from statv import Stats, Statv +class _Waiter: + def __init__(self, context: ServiceContext, state: _State): + self.context = context + self.state = state -from launart._sideload import FutureMark + def __await__(self): + if self.context.state is not self.state: + yield self + return -U_ManagerStage = Literal["preparing", "blocking", "cleaning", "finished"] -U_Stage = Union[ - Literal[ - "waiting-for-prepare", - "preparing", - "prepared", - "blocking", - "blocking-completed", - "waiting-for-cleanup", - "cleanup", - "finished", - ], - None, -] -Phase = Literal["preparing", "blocking", "cleanup"] -STAGE_STAT = { - None: {"waiting-for-prepare", "waiting-for-cleanup", "blocking", "finished"}, - "waiting-for-prepare": {"preparing"}, - "preparing": {"prepared"}, - "prepared": {"blocking", "waiting-for-cleanup", "finished"}, - "blocking": {"blocking-completed"}, - "blocking-completed": {"waiting-for-cleanup", "finished"}, - "waiting-for-cleanup": {"cleanup"}, - "cleanup": {"finished"}, - "finished": {None}, -} -STATS = [ - None, - "waiting-for-prepare", - "preparing", - "prepared", - "blocking", - "blocking-completed", - "waiting-for-cleanup", - "cleanup", - "finished", -] + __iter__ = __await__ -class ManagerStatus(Statv): - stage = Stats[Optional[U_ManagerStage]]("U_ManagerStage", default=None) - exiting = Stats[bool]("exiting", default=False) +class Status: + def __init__(self, context: ServiceContext): + self._context = context - def __init__(self) -> None: - super().__init__() + @property + def exiting(self): + return self._context.should_exit def __repr__(self) -> str: - return f"" + return f"" @property def preparing(self) -> bool: - return self.stage == "preparing" + return self._context._state is _State.PREPARE_PRE @property def blocking(self) -> bool: - return self.stage == "blocking" + return self._context.ready @property def cleaning(self) -> bool: - return self.stage == "cleaning" - - async def wait_for_update(self, *, current: str | None = None, stage: U_ManagerStage | None = None): - waiter = asyncio.Future() - if current is not None: - waiter.add_done_callback(FutureMark(current, stage)) - self._waiters.append(waiter) - try: - return await waiter - finally: - self._waiters.remove(waiter) + return self._context._state is _State.CLEANUP_PRE async def wait_for_preparing(self): - while not self.preparing: - await self.wait_for_update() + return await _Waiter(self._context, _State.PREPARE_PRE) async def wait_for_blocking(self): - while not self.blocking: - await self.wait_for_update() + return await _Waiter(self._context, _State.READY) - async def wait_for_cleaning(self, *, current: str | None = None): - while not self.cleaning: - await self.wait_for_update(current=current, stage="cleaning") - - async def wait_for_finished(self, *, current: str | None = None): - while self.stage not in {"finished", None}: - await self.wait_for_update(current=current, stage="finished") + async def wait_for_cleaning(self): + return await _Waiter(self._context, _State.CLEANUP_PRE) async def wait_for_sigexit(self): - while self.stage in {"preparing", "blocking"} and not self.exiting: - await self.wait_for_update() - - -class ServiceStatus(Statv): - stage = Stats[Optional[U_Stage]]("stage", default=None) - - def __init__(self) -> None: - super().__init__() - - @property - def prepared(self) -> bool: - return self.stage in ("prepared", "blocking") - - @property - def blocking(self) -> bool: - return self.stage == "blocking" - - @property - def finished(self) -> bool: - return self.stage == "finished" - - @staticmethod - @stage.validator - def _(stats: Stats[U_Stage | None], past: U_Stage | None, current: U_Stage | None): - if current not in STAGE_STAT[past]: - raise ValueError(f"Invalid stage transition: {past} -> {current}") - return current - - def unset(self) -> None: - self.stage = None - - async def wait_for(self, stage: U_Stage = None): - stages = set(STATS[STATS.index(stage) :]) - while self.stage not in stages: - await self.wait_for_update() + return await self._context.wait_for_sigexit() diff --git a/launart/util.py b/launart/util.py new file mode 100644 index 0000000..e25a306 --- /dev/null +++ b/launart/util.py @@ -0,0 +1,23 @@ +from typing import Any, TypeVar, cast + + +class Override: + def __init__(self, source: Any, additional: dict[str, Any]): + self.__source = source + self.__additional = additional + + @property + def _source(self): + return self.__source + + def __getattr__(self, item): + if item in self.__additional: + return self.__additional[item] + return getattr(self.__source, item) + + +T = TypeVar("T") + + +def override(source: T, additional: dict[str, Any]) -> T: + return cast(T, Override(source, additional)) diff --git a/launart/utilles.py b/launart/utilles.py deleted file mode 100644 index ca57dcf..0000000 --- a/launart/utilles.py +++ /dev/null @@ -1,120 +0,0 @@ -from __future__ import annotations - -import asyncio -import enum -from typing import ( - TYPE_CHECKING, - Coroutine, - Hashable, - Iterable, - List, - Optional, - Set, - TypeVar, - Union, -) - -from loguru import logger - -if TYPE_CHECKING: - from launart.service import Service - -T = TypeVar("T") -H = TypeVar("H", bound=Hashable) - - -class _Unmarked(enum.Enum): - UNMARKED = object() - - -UNMARKED = _Unmarked.UNMARKED -# "Unmarked" is a great way of replacing ellipsis. - - -class RequirementResolveFailed(ValueError): - pass - - -class FlexibleTaskGroup: - tasks: list[asyncio.Task] - sideload_trackers: dict[str, asyncio.Task] - blocking_task: Optional[asyncio.Task] = None - stop: bool = False - - def __init__(self, *tasks): - self.sideload_trackers = {} - self.tasks = list(tasks) - - def __await__(self): - return self.__await_impl__().__await__() - - async def __await_impl__(self): - while True: - self.blocking_task = asyncio.create_task(asyncio.wait(self.tasks)) - try: - return await self.blocking_task - except asyncio.CancelledError: - if self.stop: - return - - def add(self, *fs: asyncio.Task | Coroutine) -> None: - tasks = [f if isinstance(f, asyncio.Task) else asyncio.create_task(f) for f in fs] - if self.blocking_task is not None: - self.blocking_task.cancel() - self.tasks.extend(tasks) - - -async def wait_fut( - coros: Iterable[Union[Coroutine, asyncio.Task]], - *, - timeout: Optional[float] = None, - return_when: str = asyncio.ALL_COMPLETED, -) -> None: - tasks = [] - for c in coros: - if asyncio.iscoroutine(c): - tasks.append(asyncio.create_task(c)) - else: - tasks.append(c) - if tasks: - await asyncio.wait(tasks, timeout=timeout, return_when=return_when) - - -async def any_completed(*waits): - return await asyncio.wait( - [i if isinstance(i, asyncio.Task) else asyncio.create_task(i) for i in waits], - return_when=asyncio.FIRST_COMPLETED, - ) - - -def cancel_alive_tasks(loop: asyncio.AbstractEventLoop): - to_cancel = asyncio.tasks.all_tasks(loop) - if to_cancel: - for tsk in to_cancel: - tsk.cancel() - loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True)) - - for task in to_cancel: # pragma: no cover - # BELIEVE IN PSF - if task.cancelled(): - continue - if task.exception() is not None: - logger.opt(exception=task.exception()).error(f"Unhandled exception when shutting down {task}:") - - -def resolve_requirements(components: Iterable[Service], reverse: bool = False) -> List[Set[Service]]: - resolved_id: Set[str] = set() - unresolved: Set[Service] = set(components) - result: List[Set[Service]] = [] - while unresolved: - layer = {component for component in unresolved if resolved_id >= component.required} - - if layer: - unresolved -= layer - resolved_id.update(component.id for component in layer) - result.append(layer) - else: - raise RequirementResolveFailed(unresolved) - if reverse: - result.reverse() - return result diff --git a/pdm.lock b/pdm.lock index 3c3493d..df9df55 100644 --- a/pdm.lock +++ b/pdm.lock @@ -3,10 +3,12 @@ [metadata] groups = ["default", "dev", "saya"] -cross_platform = true -static_urls = false -lock_version = "4.3" -content_hash = "sha256:fb7678631844f1f56a5f86d7dfcfb3889bb416926df5bf2eeb05b9ba01923d43" +strategy = ["cross_platform"] +lock_version = "4.5.0" +content_hash = "sha256:0220f347e9e267ba4bf7667772d4733ef9dc8871a4e83cb7bb0691581c221cdf" + +[[metadata.targets]] +requires_python = ">=3.9" [[package]] name = "black" @@ -54,6 +56,7 @@ requires_python = ">=3.7" summary = "Composable command line interface toolkit" dependencies = [ "colorama; platform_system == \"Windows\"", + "importlib-metadata; python_version < \"3.8\"", ] files = [ {file = "click-8.1.3-py3-none-any.whl", hash = "sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48"}, @@ -74,6 +77,9 @@ files = [ name = "commonmark" version = "0.9.1" summary = "Python parser for the CommonMark Markdown spec" +dependencies = [ + "future>=0.14.0; python_version < \"3\"", +] files = [ {file = "commonmark-0.9.1-py2.py3-none-any.whl", hash = "sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9"}, {file = "commonmark-0.9.1.tar.gz", hash = "sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60"}, @@ -143,12 +149,12 @@ files = [ [[package]] name = "exceptiongroup" -version = "1.1.2" +version = "1.2.2" requires_python = ">=3.7" summary = "Backport of PEP 654 (exception groups)" files = [ - {file = "exceptiongroup-1.1.2-py3-none-any.whl", hash = "sha256:e346e69d186172ca7cf029c8c1d16235aa0e04035e5750b4b95039e65204328f"}, - {file = "exceptiongroup-1.1.2.tar.gz", hash = "sha256:12c3e887d6485d16943a309616de20ae5582633e0a2eda17f4e10fd61c1e8af5"}, + {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, + {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, ] [[package]] @@ -183,6 +189,7 @@ version = "6.8.0" requires_python = ">=3.8" summary = "Read metadata from Python packages" dependencies = [ + "typing-extensions>=3.6.4; python_version < \"3.8\"", "zipp>=0.5", ] files = [ @@ -215,6 +222,7 @@ version = "0.6.0" requires_python = ">=3.5" summary = "Python logging made (stupidly) simple" dependencies = [ + "aiocontextvars>=0.2.0; python_version < \"3.7\"", "colorama>=0.3.4; sys_platform == \"win32\"", "win32-setctime>=1.0.0; sys_platform == \"win32\"", ] @@ -227,6 +235,9 @@ files = [ name = "mypy-extensions" version = "0.4.3" summary = "Experimental type system extensions for programs checked with the mypy typechecker." +dependencies = [ + "typing>=3.5.3; python_version < \"3.5\"", +] files = [ {file = "mypy_extensions-0.4.3-py2.py3-none-any.whl", hash = "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d"}, {file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"}, @@ -267,6 +278,9 @@ name = "pluggy" version = "1.0.0" requires_python = ">=3.6" summary = "plugin and hook calling mechanisms for python" +dependencies = [ + "importlib-metadata>=0.12; python_version < \"3.8\"", +] files = [ {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"}, {file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"}, @@ -290,6 +304,7 @@ summary = "pytest: simple powerful testing with Python" dependencies = [ "colorama; sys_platform == \"win32\"", "exceptiongroup>=1.0.0rc8; python_version < \"3.11\"", + "importlib-metadata>=0.12; python_version < \"3.8\"", "iniconfig", "packaging", "pluggy<2.0,>=0.12", @@ -307,6 +322,7 @@ requires_python = ">=3.7" summary = "Pytest support for asyncio" dependencies = [ "pytest>=7.0.0", + "typing-extensions>=3.7.2; python_version < \"3.8\"", ] files = [ {file = "pytest-asyncio-0.21.1.tar.gz", hash = "sha256:40a7eae6dded22c7b604986855ea48400ab15b069ae38116e8c01238e9eeb64d"}, @@ -320,6 +336,7 @@ requires_python = ">=3.6.3,<4.0.0" summary = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal" dependencies = [ "commonmark<0.10.0,>=0.9.0", + "dataclasses<0.9,>=0.7; python_version < \"3.7\"", "pygments<3.0.0,>=2.6.0", "typing-extensions<5.0,>=4.0.0; python_version < \"3.9\"", ] @@ -364,12 +381,12 @@ files = [ [[package]] name = "typing-extensions" -version = "4.3.0" -requires_python = ">=3.7" -summary = "Backported and Experimental Type Hints for Python 3.7+" +version = "4.12.2" +requires_python = ">=3.8" +summary = "Backported and Experimental Type Hints for Python 3.8+" files = [ - {file = "typing_extensions-4.3.0-py3-none-any.whl", hash = "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02"}, - {file = "typing_extensions-4.3.0.tar.gz", hash = "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"}, + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index 507f12f..30cd445 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,11 +6,12 @@ authors = [ {name = "GreyElaina", email = "GreyElaina@outlook.com"}, ] dependencies = [ - "statv>=0.2.2", "loguru>=0.6.0", "creart>=0.3.0", + "typing-extensions>=4.5.0", + "exceptiongroup>=1.2.2", ] -requires-python = ">=3.8" +requires-python = ">=3.9" readme = "README.md" license = {text = "MIT"} @@ -31,7 +32,7 @@ profile = "black" [tool.coverage.run] branch = true source = ["."] -omit = ["tests/*", "test.py"] +omit = ["tests/*", "test.py", "test-launart.py"] [tool.coverage.report] # Regexes for lines to exclude from consideration @@ -83,6 +84,6 @@ build-backend = "pdm.backend" exclude = ["__pypackages__"] [tool.pytest.ini_options] -python_files = "tests/*" +python_files = "test-launart.py" # TODO: fix tests/* asyncio_mode = "strict" norecursedirs = "_saya_mod" diff --git a/pyrightconfig.json b/pyrightconfig.json index f18aafc..04d3c45 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -1,7 +1,7 @@ { "exclude": ["__pypackages__", ".venv"], "reportShadowedImports": false, - "pythonVersion": "3.8", + "pythonVersion": "3.9", "venv": ".venv", "venvPath": ".", "typeCheckingMode": "basic", diff --git a/test-launart.py b/test-launart.py new file mode 100644 index 0000000..a4c6b26 --- /dev/null +++ b/test-launart.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import asyncio + +from launart import Launart, Service + + +def test_launart(): + art = Launart() + msg = [] + + def push_msg(m): + msg.append(m) + + class TestSrv(Service): + id = "test_srv" + + @property + def required(self) -> set[str]: + return set() + + @property + def stages(self) -> set[str]: + return {"preparing", "cleanup"} + + async def launch(self, manager: Launart): + async with self.stage("preparing"): + push_msg("TestSrv: prepared TestInterface") + async with self.stage("cleanup"): + push_msg("TestSrv: cleanup TestInterface") + + class TestService(Service): + id = "test" + + @property + def required(self): + return {"test_srv"} + + @property + def stages(self) -> set[str]: + return {"preparing", "blocking", "cleanup"} + + async def launch(self, manager: Launart): + async with self.stage("preparing"): + push_msg("prepare 1") + await asyncio.sleep(3) + async with self.stage("blocking"): + push_msg("blocking 1") + await asyncio.sleep(3) + push_msg("unblocking 1") + async with self.stage("cleanup"): + push_msg("cleanup 1") + await asyncio.sleep(3) + + class Test2(Service): + id = "test2" + + @property + def required(self) -> set[str]: + return {"test"} + + @property + def stages(self) -> set[str]: + return {"preparing", "blocking", "cleanup"} + + async def launch(self, manager: Launart): + async with self.stage("preparing"): + push_msg("prepare 2") + + async with self.stage("blocking"): + push_msg("blocking 2") + push_msg("test for sideload") + await manager.add_sideload(TestSideload()) + await asyncio.sleep(1) + await manager.get_component("test_sideload").status.wait_for_blocking() + push_msg("sideload in blocking, test for active cleanup") + await manager.remove_sideload("test_sideload") + await asyncio.sleep(1) + push_msg("unblocking 2") + await asyncio.sleep(3) + + async with self.stage("cleanup"): + push_msg("cleanup2") + + class TestSideload(Service): + id = "test_sideload" + + @property + def required(self) -> set[str]: + return set() + + @property + def stages(self) -> set[str]: + return {"preparing", "blocking", "cleanup"} + + async def launch(self, manager: Launart): + async with self.stage("preparing"): + push_msg("prepare in sideload") + await asyncio.sleep(3) + async with self.stage("blocking"): + push_msg("blocking in sideload") + await asyncio.sleep(3) + push_msg("unblocking in sideload") + async with self.stage("cleanup"): + push_msg("cleanup in sideload") + await asyncio.sleep(3) + + art.add_component(TestSrv()) + art.add_component(TestService()) + art.add_component(Test2()) + assert list(art._initial_services.keys()) == ["test_srv", "test", "test2"] + art.launch_blocking() + + assert msg == [ + "TestSrv: prepared TestInterface", + "prepare 1", + "prepare 2", + "blocking 1", + "blocking 2", + "test for sideload", + "prepare in sideload", + "unblocking 1", + "blocking in sideload", + "sideload in blocking, test for active cleanup", + "unblocking in sideload", + "cleanup in sideload", + "unblocking 2", + "cleanup2", + "cleanup 1", + "TestSrv: cleanup TestInterface", + ] diff --git a/test.py b/test.py index 190639a..5b39e7e 100644 --- a/test.py +++ b/test.py @@ -1,12 +1,13 @@ from __future__ import annotations + import asyncio from launart import Launart, Service +# from _bootstrap import Stage, Phase art = Launart() - class TestSrv(Service): id = "test_srv" @@ -16,11 +17,13 @@ def required(self) -> set[str]: @property def stages(self) -> set[str]: - return {"preparing"} + return {"preparing", "cleanup"} async def launch(self, manager: Launart): async with self.stage("preparing"): print("TestSrv: prepared TestInterface") + async with self.stage("cleanup"): + print("TestSrv: cleanup TestInterface") class TestService(Service): @@ -36,14 +39,14 @@ def stages(self) -> set[str]: async def launch(self, manager: Launart): async with self.stage("preparing"): - print("prepare") + print("prepare 1") await asyncio.sleep(3) async with self.stage("blocking"): - print("blocking") + print("blocking 1") await asyncio.sleep(3) print("unblocking 1") async with self.stage("cleanup"): - print("cleanup") + print("cleanup 1") await asyncio.sleep(3) @@ -60,19 +63,19 @@ def stages(self) -> set[str]: async def launch(self, manager: Launart): async with self.stage("preparing"): - print("prepare2") + print("prepare 2") async with self.stage("blocking"): - print("blocking") + print("blocking 2") print("test for sideload") - manager.add_component(TestSideload()) - await asyncio.sleep(3) - print("unblocking 2") - # await asyncio.sleep(1) - await manager.components["test_sideload"].status.wait_for("blocking") + await manager.add_sideload(TestSideload()) + await asyncio.sleep(1) + await manager.get_component("test_sideload").status.wait_for_blocking() print("sideload in blocking, test for active cleanup") - manager.remove_component("test_sideload") - await asyncio.sleep(10) + await manager.remove_sideload("test_sideload") + await asyncio.sleep(1) + print("unblocking 2") + await asyncio.sleep(3) async with self.stage("cleanup"): print("cleanup2") diff --git a/tests/_saya_mod/ok_sub.py b/tests/_saya_mod/ok_sub.py index 28a856a..ea3e7c0 100644 --- a/tests/_saya_mod/ok_sub.py +++ b/tests/_saya_mod/ok_sub.py @@ -1,12 +1,11 @@ from graia.saya.channel import Channel -from launart.service import Service from launart.saya import ServiceSchema +from launart.service import Service c = Channel.current() - class SayaTestService(Service): id = "lc.test.saya" diff --git a/tests/fixture.py b/tests/fixture.py index 58262e5..88f6f2d 100644 --- a/tests/fixture.py +++ b/tests/fixture.py @@ -79,5 +79,4 @@ def stages(self): async def launch(self, _): ... - return Srv() diff --git a/tests/launchable.py b/tests/launchable.py index 69eb01d..706947d 100644 --- a/tests/launchable.py +++ b/tests/launchable.py @@ -1,12 +1,14 @@ from __future__ import annotations + import asyncio import pytest from launart import Launart -from launart.service import Service, ServiceStatus +from launart.service import Service from tests.fixture import EmptyService + def test_ensure(): lc = EmptyService() mgr = Launart() @@ -16,74 +18,74 @@ def test_ensure(): lc.ensure_manager(Launart()) -def test_service_stat_transition_raw(): - stat = ServiceStatus() - stat.stage = "blocking" - assert stat.stage == "blocking" - with pytest.raises(ValueError): - stat.stage = "preparing" # rollback is not allowed - stat.stage = "blocking-completed" - stat.stage = "finished" - stat.unset() - - -def test_service_stat_transition_base_err_report(): - class _Base(Service): - @property - def required(self): - return set() - - @property - def stages(self): - return {"preparing"} - - class ErrNoMgr(_Base): - id = "e1" - - async def launch(self, _): - async with self.stage("preparing"): - ... - - with pytest.raises(RuntimeError): - asyncio.run(ErrNoMgr().launch(None)) - with pytest.raises(LookupError): - mgr = Launart() - e = ErrNoMgr() - assert not e.manager - e.ensure_manager(mgr) - asyncio.run(e.launch(None)) - - class ErrUnexpectedStage(_Base): - id = "e2" - - async def launch(self, _): - async with self.stage("blocking"): # oops - ... - - with pytest.raises(ValueError): - mgr = Launart() - e = ErrUnexpectedStage() - e.ensure_manager(mgr) - mgr.status.stage = "preparing" - asyncio.run(e.launch(None)) - - class ErrUnknownStageDef(_Base): - id = "e2" - - @property - def stages(self): - return {"finished"} - - async def launch(self, _): - async with self.stage("finished"): # type: ignore - ... - - with pytest.raises(ValueError): - mgr = Launart() - e = ErrUnknownStageDef() - e.ensure_manager(mgr) - mgr.status.stage = "preparing" - asyncio.run(e.launch(None)) +# def test_service_stat_transition_raw(): +# stat = ServiceStatus() +# stat.stage = "blocking" +# assert stat.stage == "blocking" +# with pytest.raises(ValueError): +# stat.stage = "preparing" # rollback is not allowed +# stat.stage = "blocking-completed" +# stat.stage = "finished" +# stat.unset() + + +# def test_service_stat_transition_base_err_report(): +# class _Base(Service): +# @property +# def required(self): +# return set() +# +# @property +# def stages(self): +# return {"preparing"} +# +# class ErrNoMgr(_Base): +# id = "e1" +# +# async def launch(self, _): +# async with self.stage("preparing"): +# ... +# +# with pytest.raises(RuntimeError): +# asyncio.run(ErrNoMgr().launch(None)) +# with pytest.raises(LookupError): +# mgr = Launart() +# e = ErrNoMgr() +# assert not e.manager +# e.ensure_manager(mgr) +# asyncio.run(e.launch(None)) +# +# class ErrUnexpectedStage(_Base): +# id = "e2" +# +# async def launch(self, _): +# async with self.stage("blocking"): # oops +# ... +# +# with pytest.raises(ValueError): +# mgr = Launart() +# e = ErrUnexpectedStage() +# e.ensure_manager(mgr) +# mgr.status.stage = "preparing" +# asyncio.run(e.launch(None)) +# +# class ErrUnknownStageDef(_Base): +# id = "e2" +# +# @property +# def stages(self): +# return {"finished"} +# +# async def launch(self, _): +# async with self.stage("finished"): # type: ignore +# ... +# +# with pytest.raises(ValueError): +# mgr = Launart() +# e = ErrUnknownStageDef() +# e.ensure_manager(mgr) +# mgr.status.stage = "preparing" +# asyncio.run(e.launch(None)) @pytest.mark.asyncio diff --git a/tests/manager.py b/tests/manager.py index 8f3b711..a1f0478 100644 --- a/tests/manager.py +++ b/tests/manager.py @@ -1,4 +1,5 @@ from __future__ import annotations + import asyncio from signal import SIGINT, default_int_handler, signal @@ -258,7 +259,6 @@ def get_interface(self, interface_type): def test_graceful_abort(): - failure: bool = False class Malfunction(Service): diff --git a/tests/saya.py b/tests/saya.py index d2b75c9..0d59e1f 100644 --- a/tests/saya.py +++ b/tests/saya.py @@ -1,4 +1,5 @@ from __future__ import annotations + import pytest from graia.saya import Saya from graia.saya.behaviour.entity import Behaviour diff --git a/tests/sideload.py b/tests/sideload.py index 8cf6159..ac504a2 100644 --- a/tests/sideload.py +++ b/tests/sideload.py @@ -1,4 +1,5 @@ from __future__ import annotations + import asyncio import pytest diff --git a/tests/utils.py b/tests/utils.py deleted file mode 100644 index 5b1822b..0000000 --- a/tests/utils.py +++ /dev/null @@ -1,76 +0,0 @@ -from __future__ import annotations -import asyncio -from typing import Any, cast - -import pytest - -from launart._sideload import Override, override -from launart.utilles import wait_fut, resolve_requirements, RequirementResolveFailed -from tests.fixture import component_standalone - - -def test_resolve_success(): - dataset = [ - component_standalone("a", []), - component_standalone("b", ["a"]), - component_standalone("c", ["a", "b"]), - component_standalone("d", ["a", "c"]), - component_standalone("e", []), - component_standalone("f", ["a", "c"]), - ] - expected = [{dataset[0], dataset[4]}, {dataset[1]}, {dataset[2]}, {dataset[3], dataset[5]}] - assert resolve_requirements(dataset) == expected - expected.reverse() - assert resolve_requirements(dataset, reverse=True) == expected - - -def test_resolve_fail(): - dataset = [ - component_standalone("a", ["b"]), - component_standalone("b", ["a"]), - ] - with pytest.raises(RequirementResolveFailed): - resolve_requirements(dataset) - - - -def test_override(): - class MyOrigin: - data: dict - - def __init__(self, d: dict) -> None: - self.data = d - - origin = MyOrigin({"a": 3}) - additional = {"a": 4} - o = cast(Override, override(origin, additional)) - assert o.source is origin - assert o.a == 4 - with pytest.raises(AttributeError): - o.b - - -@pytest.mark.asyncio -async def test_wait_fut(): - await wait_fut([]) - await wait_fut([asyncio.sleep(0.01), asyncio.create_task(asyncio.sleep(0.02))]) - - t1 = asyncio.create_task(asyncio.sleep(0.01)) - t2 = asyncio.create_task(asyncio.sleep(0.1)) - - await wait_fut([t1, t2], timeout=0.02) - - assert t1.done() - assert not t2.done() - - t2.cancel() - - t1 = asyncio.create_task(asyncio.sleep(0.01)) - t2 = asyncio.create_task(asyncio.sleep(0.1)) - - await wait_fut([t1, t2], return_when=asyncio.FIRST_COMPLETED) - - assert t1.done() - assert not t2.done() - - t2.cancel()