Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ test: PGPORT=$(shell docker compose port postgresql 5432 | cut -d: -f2)
test: export DB__URL=postgresql://cm-service@localhost:${PGPORT}/cm-service
test: export DB__PASSWORD=INSECURE-PASSWORD
test: export DB__TABLE_SCHEMA=cm_service_test
test: export BPS__ARTIFACT_PATH=$(PWD)/output
test: run-compose
alembic upgrade head
pytest -vvv --asyncio-mode=auto --cov=lsst.cmservice --cov-branch --cov-report=term --cov-report=html ${PYTEST_ARGS}
Expand Down
4 changes: 2 additions & 2 deletions src/lsst/cmservice/common/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
async def check_due_date(session: async_scoped_session, node: NodeMixin, time_next_check: datetime) -> None:
"""For a provided due date, check if the queue entry is overdue"""

due_date = node.metadata_.get("due_date", None)
due_date: int = node.metadata_.get("due_date", None)
if due_date is None:
return None

if time_next_check > due_date:
if time_next_check.timestamp() > due_date:
campaign = await node.get_campaign(session)
await notification.send_notification(for_status=StatusEnum.overdue, for_campaign=campaign)

Expand Down
5 changes: 5 additions & 0 deletions src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class BpsConfiguration(BaseModel):
default=16,
)

artifact_path: str = Field(
description="Filesystem path location for writing artifacts (`prod_area`)",
default="/prod_area",
)


class ButlerConfiguration(BaseModel):
"""Configuration settings for butler client operations.
Expand Down
7 changes: 3 additions & 4 deletions src/lsst/cmservice/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ async def _write_script(
await session.refresh(parent, attribute_names=["c_"])
data_dict = await script.data_dict(session)
resolved_cols = await script.resolve_collections(session)
prod_area = os.path.expandvars(config.bps.artifact_path)

# Resolve mandatory data element inputs. All of these values must be
# provided somewhere along the SpecBlock chain.
try:
prod_area = os.path.expandvars(data_dict["prod_area"])
butler_repo = os.path.expandvars(data_dict["butler_repo"])
lsst_version = os.path.expandvars(data_dict.get("lsst_version", "w_latest"))
lsst_distrib_dir = os.path.expandvars(data_dict["lsst_distrib_dir"])
Expand Down Expand Up @@ -572,7 +572,7 @@ async def _write_script(
if TYPE_CHECKING:
assert isinstance(parent, Job)
data_dict = await script.data_dict(session)
prod_area = await Path(os.path.expandvars(data_dict["prod_area"])).resolve()
prod_area = await Path(os.path.expandvars(config.bps.artifact_path)).resolve()
resolved_cols = await script.resolve_collections(session)
script_url = await self._set_script_files(session, script, prod_area)
butler_repo = data_dict["butler_repo"]
Expand Down Expand Up @@ -623,8 +623,7 @@ async def _do_prepare(
) -> StatusEnum:
if TYPE_CHECKING:
assert isinstance(parent, Job)
data_dict = await script.data_dict(session)
prod_area = await Path(os.path.expandvars(data_dict["prod_area"])).resolve()
prod_area = await Path(os.path.expandvars(config.bps.artifact_path)).resolve()

report_url = parent.metadata_.get("report_url") or (
prod_area / parent.fullname / "manifest_report.yaml"
Expand Down
22 changes: 11 additions & 11 deletions src/lsst/cmservice/handlers/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def _write_script(
data_dict = await script.data_dict(session)
try:
output_coll = resolved_cols["output"]
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
except KeyError as e:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {e}") from e
Expand Down Expand Up @@ -92,7 +92,7 @@ async def _write_script(
try:
output_coll = resolved_cols["output"]
input_colls = resolved_cols["inputs"]
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
logger.exception()
Expand Down Expand Up @@ -158,7 +158,7 @@ async def _write_script(
try:
output_coll = resolved_cols["output"]
input_coll = resolved_cols["input"]
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
Expand Down Expand Up @@ -238,7 +238,7 @@ async def _write_script(
raise CMMissingScriptInputError(
"Must specify what to collect in ChainCollectScriptHandler, jobs or steps",
)
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
command = f"{config.butler.butler_bin} collection-chain {butler_repo} {output_coll}"
for collect_coll_ in collect_colls:
Expand Down Expand Up @@ -298,7 +298,7 @@ async def _write_script(
try:
output_coll = resolved_cols["output"]
input_coll = resolved_cols["input"]
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
data_query = data_dict.get("data_query")
except KeyError as msg:
Expand Down Expand Up @@ -354,7 +354,7 @@ async def _write_script(
data_dict = await script.data_dict(session)
try:
output_coll = resolved_cols["output"]
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
Expand Down Expand Up @@ -410,7 +410,7 @@ async def _write_script(
try:
input_coll = resolved_cols["input"]
output_coll = resolved_cols["output"]
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
Expand Down Expand Up @@ -474,7 +474,7 @@ async def _write_script(
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
output_coll = resolved_cols["output"]
except KeyError as msg:
Expand Down Expand Up @@ -534,7 +534,7 @@ async def _write_script(
) -> StatusEnum:
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
prod_area = os.path.expandvars(data_dict["prod_area"])
prod_area = os.path.expandvars(config.bps.artifact_path)
script_url = await self._set_script_files(session, script, prod_area)
butler_repo = data_dict["butler_repo"]
usage_graph_url = os.path.expandvars(f"{prod_area}/{parent.fullname}/resource_usage.qgraph")
Expand Down Expand Up @@ -591,7 +591,7 @@ async def _write_script(
) -> StatusEnum:
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
prod_area = os.path.expandvars(data_dict["prod_area"])
prod_area = os.path.expandvars(config.bps.artifact_path)
script_url = await self._set_script_files(session, script, prod_area)
butler_repo = data_dict["butler_repo"]
hips_maps_graph_url = os.path.expandvars(f"{prod_area}/{parent.fullname}/hips_maps.qgraph")
Expand Down Expand Up @@ -693,7 +693,7 @@ async def _write_script(
try:
input_coll = resolved_cols["input"]
output_coll = resolved_cols["output"]
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
script_url = await self._set_script_files(session, script, config.bps.artifact_path)
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
Expand Down
10 changes: 5 additions & 5 deletions src/lsst/cmservice/parsing/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ def parse_element_fullname(fullname: str) -> Fullname:
fullname_r = re.compile(
(
r"^"
r"(?P<campaign>[\w]+){1}(?:\/)*"
r"(?P<step>[\w]+){0,1}(?:\/)*"
r"(?P<group>[\w]+){0,1}(?:\/)*"
r"(?P<job>[\w]+){0,1}(?:\/)*"
r"(?P<script>[\w]+){0,1}"
r"(?P<campaign>[\w-]+){1}(?:\/)*"
r"(?P<step>[\w-]+){0,1}(?:\/)*"
r"(?P<group>[\w-]+){0,1}(?:\/)*"
r"(?P<job>[\w-]+){0,1}(?:\/)*"
r"(?P<script>[\w-]+){0,1}"
r"$"
),
re.MULTILINE,
Expand Down
1 change: 0 additions & 1 deletion tests/fixtures/seeds/empty_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Campaign:
campaign_validation: "{out}/validate"
data:
butler_repo: "/repo/main"
prod_area: "output/archive"
data_query: "instrument = 'HSC'"
lsst_version: "${WEEKLY}"
- Specification:
Expand Down
1 change: 0 additions & 1 deletion tests/fixtures/seeds/start_trivial.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ Campaign:
campaign_source: HSC/raw/RC2
data:
lsst_version: w_2024_38
prod_area: /tmp/trivial
butler_repo: "/repo/main"
data_query: "instrument = 'HSC' AND exposure in (30504, 30502) AND detector in (45, 46, 47, 48)"
2 changes: 0 additions & 2 deletions tests/fixtures/seeds/test_hsc_micro.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
split_min_groups: 2
data:
butler_repo: "/repo/main"
prod_area: "output/archive"
data_query: "instrument = 'HSC' AND exposure in (30504, 30502) AND detector in (45, 46, 47, 48)"
lsst_version: w_2025_01
bps_wms_clustering_file: "${DRP_PIPE_DIR}/bps/clustering/HSC/DRP-RC2-clustering.yaml"
Expand Down Expand Up @@ -65,7 +64,6 @@
campaign_validation: "{out}/validate"
data:
butler_repo: "/repo/main"
prod_area: "output/archive"
data_query: "instrument = 'HSC'"
lsst_version: "${WEEKLY}"
- Specification:
Expand Down
1 change: 0 additions & 1 deletion tests/fixtures/seeds/test_trivial.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
split_method: no_split
data:
butler_repo: "/repo/main"
prod_area: "output/archive"
data_query: "instrument = 'HSC' AND exposure in (30504, 30502) AND detector in (45, 46, 47, 48)"
lsst_version: w_2023_46
- Specification:
Expand Down
1 change: 1 addition & 0 deletions tests/test_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
("campaign_name/step_name/group0", "group"),
("campaign_name/step_name", "step"),
("campaign_name", "campaign"),
("campaign-name", "campaign"),
],
)
def test_fullname_parsing(fullname: str, expected: str) -> None:
Expand Down