diff --git a/Makefile b/Makefile index d6d5066b8..3b6a39451 100644 --- a/Makefile +++ b/Makefile @@ -107,6 +107,7 @@ test: PGPORT=$(shell docker compose port postgresql 5432 | cut -d: -f2) test: export DB__URL=postgresql://cm-service@localhost:${PGPORT}/cm-service test: export DB__PASSWORD=INSECURE-PASSWORD test: export DB__TABLE_SCHEMA=cm_service_test +test: export BPS__ARTIFACT_PATH=$(PWD)/output test: run-compose alembic upgrade head pytest -vvv --asyncio-mode=auto --cov=lsst.cmservice --cov-branch --cov-report=term --cov-report=html ${PYTEST_ARGS} diff --git a/src/lsst/cmservice/common/daemon.py b/src/lsst/cmservice/common/daemon.py index 0a8cb5855..92f5e8034 100644 --- a/src/lsst/cmservice/common/daemon.py +++ b/src/lsst/cmservice/common/daemon.py @@ -18,11 +18,11 @@ async def check_due_date(session: async_scoped_session, node: NodeMixin, time_next_check: datetime) -> None: """For a provided due date, check if the queue entry is overdue""" - due_date = node.metadata_.get("due_date", None) + due_date: int = node.metadata_.get("due_date", None) if due_date is None: return None - if time_next_check > due_date: + if time_next_check.timestamp() > due_date: campaign = await node.get_campaign(session) await notification.send_notification(for_status=StatusEnum.overdue, for_campaign=campaign) diff --git a/src/lsst/cmservice/config.py b/src/lsst/cmservice/config.py index bfb045c15..2bd83bd68 100644 --- a/src/lsst/cmservice/config.py +++ b/src/lsst/cmservice/config.py @@ -60,6 +60,11 @@ class BpsConfiguration(BaseModel): default=16, ) + artifact_path: str = Field( + description="Filesystem path location for writing artifacts (`prod_area`)", + default="/prod_area", + ) + class ButlerConfiguration(BaseModel): """Configuration settings for butler client operations. diff --git a/src/lsst/cmservice/handlers/jobs.py b/src/lsst/cmservice/handlers/jobs.py index 0f735b704..9eb5de9f0 100644 --- a/src/lsst/cmservice/handlers/jobs.py +++ b/src/lsst/cmservice/handlers/jobs.py @@ -75,11 +75,11 @@ async def _write_script( await session.refresh(parent, attribute_names=["c_"]) data_dict = await script.data_dict(session) resolved_cols = await script.resolve_collections(session) + prod_area = os.path.expandvars(config.bps.artifact_path) # Resolve mandatory data element inputs. All of these values must be # provided somewhere along the SpecBlock chain. try: - prod_area = os.path.expandvars(data_dict["prod_area"]) butler_repo = os.path.expandvars(data_dict["butler_repo"]) lsst_version = os.path.expandvars(data_dict.get("lsst_version", "w_latest")) lsst_distrib_dir = os.path.expandvars(data_dict["lsst_distrib_dir"]) @@ -572,7 +572,7 @@ async def _write_script( if TYPE_CHECKING: assert isinstance(parent, Job) data_dict = await script.data_dict(session) - prod_area = await Path(os.path.expandvars(data_dict["prod_area"])).resolve() + prod_area = await Path(os.path.expandvars(config.bps.artifact_path)).resolve() resolved_cols = await script.resolve_collections(session) script_url = await self._set_script_files(session, script, prod_area) butler_repo = data_dict["butler_repo"] @@ -623,8 +623,7 @@ async def _do_prepare( ) -> StatusEnum: if TYPE_CHECKING: assert isinstance(parent, Job) - data_dict = await script.data_dict(session) - prod_area = await Path(os.path.expandvars(data_dict["prod_area"])).resolve() + prod_area = await Path(os.path.expandvars(config.bps.artifact_path)).resolve() report_url = parent.metadata_.get("report_url") or ( prod_area / parent.fullname / "manifest_report.yaml" diff --git a/src/lsst/cmservice/handlers/scripts.py b/src/lsst/cmservice/handlers/scripts.py index 3b5284270..d9de8549f 100644 --- a/src/lsst/cmservice/handlers/scripts.py +++ b/src/lsst/cmservice/handlers/scripts.py @@ -40,7 +40,7 @@ async def _write_script( data_dict = await script.data_dict(session) try: output_coll = resolved_cols["output"] - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] except KeyError as e: raise CMMissingScriptInputError(f"{script.fullname} missing an input: {e}") from e @@ -92,7 +92,7 @@ async def _write_script( try: output_coll = resolved_cols["output"] input_colls = resolved_cols["inputs"] - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] except KeyError as msg: logger.exception() @@ -158,7 +158,7 @@ async def _write_script( try: output_coll = resolved_cols["output"] input_coll = resolved_cols["input"] - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] except KeyError as msg: raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg @@ -238,7 +238,7 @@ async def _write_script( raise CMMissingScriptInputError( "Must specify what to collect in ChainCollectScriptHandler, jobs or steps", ) - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] command = f"{config.butler.butler_bin} collection-chain {butler_repo} {output_coll}" for collect_coll_ in collect_colls: @@ -298,7 +298,7 @@ async def _write_script( try: output_coll = resolved_cols["output"] input_coll = resolved_cols["input"] - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] data_query = data_dict.get("data_query") except KeyError as msg: @@ -354,7 +354,7 @@ async def _write_script( data_dict = await script.data_dict(session) try: output_coll = resolved_cols["output"] - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] except KeyError as msg: raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg @@ -410,7 +410,7 @@ async def _write_script( try: input_coll = resolved_cols["input"] output_coll = resolved_cols["output"] - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] except KeyError as msg: raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg @@ -474,7 +474,7 @@ async def _write_script( resolved_cols = await script.resolve_collections(session) data_dict = await script.data_dict(session) try: - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] output_coll = resolved_cols["output"] except KeyError as msg: @@ -534,7 +534,7 @@ async def _write_script( ) -> StatusEnum: resolved_cols = await script.resolve_collections(session) data_dict = await script.data_dict(session) - prod_area = os.path.expandvars(data_dict["prod_area"]) + prod_area = os.path.expandvars(config.bps.artifact_path) script_url = await self._set_script_files(session, script, prod_area) butler_repo = data_dict["butler_repo"] usage_graph_url = os.path.expandvars(f"{prod_area}/{parent.fullname}/resource_usage.qgraph") @@ -591,7 +591,7 @@ async def _write_script( ) -> StatusEnum: resolved_cols = await script.resolve_collections(session) data_dict = await script.data_dict(session) - prod_area = os.path.expandvars(data_dict["prod_area"]) + prod_area = os.path.expandvars(config.bps.artifact_path) script_url = await self._set_script_files(session, script, prod_area) butler_repo = data_dict["butler_repo"] hips_maps_graph_url = os.path.expandvars(f"{prod_area}/{parent.fullname}/hips_maps.qgraph") @@ -693,7 +693,7 @@ async def _write_script( try: input_coll = resolved_cols["input"] output_coll = resolved_cols["output"] - script_url = await self._set_script_files(session, script, data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, config.bps.artifact_path) butler_repo = data_dict["butler_repo"] except KeyError as msg: raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg diff --git a/src/lsst/cmservice/parsing/string.py b/src/lsst/cmservice/parsing/string.py index 24a052ede..dc485a187 100644 --- a/src/lsst/cmservice/parsing/string.py +++ b/src/lsst/cmservice/parsing/string.py @@ -48,11 +48,11 @@ def parse_element_fullname(fullname: str) -> Fullname: fullname_r = re.compile( ( r"^" - r"(?P[\w]+){1}(?:\/)*" - r"(?P[\w]+){0,1}(?:\/)*" - r"(?P[\w]+){0,1}(?:\/)*" - r"(?P[\w]+){0,1}(?:\/)*" - r"(?P