diff --git a/Dockerfile.ingest b/Dockerfile.ingest index 2f1adfd..5e29b0b 100644 --- a/Dockerfile.ingest +++ b/Dockerfile.ingest @@ -21,14 +21,14 @@ # Dockerfile for ingest service. -ARG RUBINENV_VERSION=10.0.0 +ARG RUBINENV_VERSION=12.0.0 FROM ghcr.io/lsst-dm/docker-newinstall:9-latest-${RUBINENV_VERSION} ARG OBS_LSST_VERSION -ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2025_29} +ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2025_43} USER lsst RUN source loadLSST.bash && mamba install redis-py RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" lsst_obs -COPY src/ingest.py src/info.py src/utils.py src/rucio_interface.py ./ingest/ +COPY src/ingest.py src/info.py src/utils.py ./ingest/ # Environment variables that must be set: # REDIS_HOST REDIS_PASSWORD BUCKET BUTLER_REPO # For Rucio (all must be set if RUCIO_RSE is set): diff --git a/src/ingest.py b/src/ingest.py index 5f50a2d..c09a619 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -33,14 +33,17 @@ import requests from lsst.daf.butler import Butler from lsst.obs.base import DefineVisitsTask, RawIngestTask +from lsst.obs.lsst import ingest_guider from lsst.resources import ResourcePath from info import Info -from rucio_interface import RucioInterface from utils import setup_logging, setup_redis -MAX_FAILURES: int = 3 -"""Retry ingests until this many failures (`int`).""" +max_failures = int(os.environ.get("MAX_FAILURES", "3")) +"""Retry ingests until this many failures.""" + +max_guider_wait = float(os.environ.get("MAX_GUIDER_WAIT", "5.0")) +"""Wait this long for science images to arrive after guiders.""" max_ingests = int(os.environ.get("MAX_INGESTS", "10")) """Accept up to this many files to ingest at once.""" @@ -59,15 +62,16 @@ worker_name = socket.gethostname() worker_queue = f"WORKER:{bucket}:{worker_name}" -if not is_lfa: - rucio_rse = os.environ.get("RUCIO_RSE", None) - if rucio_rse: - dtn_url = os.environ["RUCIO_DTN"] - if not dtn_url.endswith("/"): - dtn_url += "/" - rucio_interface = RucioInterface(rucio_rse, dtn_url, bucket, os.environ["RUCIO_SCOPE"]) - success_refs = [] +retry_guider_obsids = dict() + + +class _DuplicateIngestError(RuntimeError): + pass + + +class _UndefinedExposureError(RuntimeError): + pass def on_success(datasets): @@ -86,7 +90,6 @@ def on_success(datasets): logger.info("Ingested %s", dataset) info = Info.from_path(dataset.path.geturl()) logger.debug("%s", info) - success_refs.extend(dataset.refs) with r.pipeline() as pipe: pipe.lrem(worker_queue, 0, info.path) pipe.hset(f"FILE:{info.path}", "ingest_time", str(time.time())) @@ -94,37 +97,113 @@ def on_success(datasets): pipe.execute() if not is_lfa: webhook_filenames.setdefault(info.exp_id, []).append(info.filename) + # If we ingested something, guiders should now be able to be ingested. + if info.exp_id in retry_guider_obsids: + del retry_guider_obsids[info.exp_id] if webhook_uri: for exp_id in webhook_filenames: info_dict = {"exp_id": exp_id, "filenames": webhook_filenames[exp_id]} - resp = requests.post(webhook_uri, json=info_dict, timeout=0.5) - logger.info("Webhook response %s: %s", info_dict, resp) + try: + resp = requests.post(webhook_uri, json=info_dict, timeout=0.5) + logger.info("Webhook response %s: %s", info_dict, resp) + except Exception: + # Ignore webhook exceptions + logger.exception("Webhook exception for %s", info_dict) -def on_ingest_failure(dataset, exc): +def on_ingest_failure(exposure_data, exc): """Callback for ingest failure. Record statistics; give up on the dataset if it fails 3 times. Parameters ---------- - dataset: `lsst.obs.base.ingest.RawFileData` - Raw dataset that failed ingest. + exposure_data: `lsst.obs.base.ingest.RawExposureData` + Information about raw datasets that failed ingest. exc: `Exception` Exception raised by the ingest failure. """ - logger.error("Failed to ingest %s: %s", dataset, exc) - info = Info.from_path(dataset.files[0].filename.geturl()) + assert len(exposure_data.files) == 1 + logger.info("Failed to ingest %s: %s", exposure_data, exc) + f = exposure_data.files[0] + info = Info.from_path(f.filename.geturl()) logger.debug("%s", info) + if "Datastore already contains" in str(exc): + logger.info("Already ingested %s", info.path) + # Don't retry these + r.lrem(worker_queue, 0, info.path) + raise _DuplicateIngestError + logger.warning("Marking for retry %s", info.path) + with r.pipeline() as pipe: + pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1) + pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc)) + pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1) + pipe.execute() + if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= max_failures: + logger.error("Giving up on %s", info.path) + r.lrem(worker_queue, 0, info.path) + + +def on_guider_ingest_failure(datasets, exc): + """Callback for guider ingest failure. + + Record statistics; give up on the dataset if it fails 3 times. + + Parameters + ---------- + datasets: `list` [ `lsst.daf.butler.FileDataset` ] + Raw guider datasets that failed ingest. + exc: `Exception` + Exception raised by the ingest failure. + """ + assert len(datasets) == 1 + dataset = datasets[0] + logger.info("Failed to ingest %s: %s", dataset, exc) + info = Info.from_path(dataset.path.geturl()) + logger.debug("%s", info) + if "Datastore already contains" in str(exc): + logger.info("Already ingested %s", info.path) + # Don't retry these + r.lrem(worker_queue, 0, info.path) + raise _DuplicateIngestError + logger.warning("Marking for retry %s", info.path) with r.pipeline() as pipe: pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1) pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc)) pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1) pipe.execute() - if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= MAX_FAILURES: + if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= max_failures: + logger.error("Giving up on %s", info.path) r.lrem(worker_queue, 0, info.path) +def on_undefined_exposure(resource, obs_id): + """Callback for undefined exposure while ingesting guiders. + + Don't do normal retry processing for these + + Parameters + ---------- + resource: `lsst.resources.ResourcePath` + Raw guider dataset that failed ingest. + obs_id: `str` + Observation id for which no exposure record was found. + """ + # No need to log; ingest_guider does that already + if obs_id not in retry_guider_obsids: + retry_guider_obsids[obs_id] = time.time() + if retry_guider_obsids[obs_id] + max_guider_wait <= time.time(): + info = Info.from_path(resource.geturl()) + with r.pipeline() as pipe: + pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1) + pipe.hset(f"FILE:{info.path}", "ing_fail_exc", "Max wait") + pipe.execute() + logger.error("Giving up on %s", info.path) + r.lrem(worker_queue, 0, info.path) + del retry_guider_obsids[obs_id] + raise _UndefinedExposureError + + def on_metadata_failure(dataset, exc): """Callback for metadata parsing failure. @@ -203,7 +282,13 @@ def main(): butler = Butler(butler_repo, writeable=True) ingest_config = RawIngestTask.ConfigClass() ingest_config.transfer = "direct" - ingester = RawIngestTask( + batch_ingester = RawIngestTask( + config=ingest_config, + butler=butler, + on_success=on_success, + on_metadata_failure=on_metadata_failure, + ) + one_by_one_ingester = RawIngestTask( config=ingest_config, butler=butler, on_success=on_success, @@ -221,7 +306,8 @@ def main(): # Process any entries on the worker queue. if r.llen(worker_queue) > 0: blobs = r.lrange(worker_queue, 0, -1) - resources = [ResourcePath(f"s3://{b.decode()}") for b in blobs] + resources = [ResourcePath(f"s3://{b.decode()}") for b in blobs if b"_guider" not in b] + guiders = [ResourcePath(f"s3://{b.decode()}") for b in blobs if b"_guider" in b] # Ingest if we have resources if resources: @@ -231,9 +317,16 @@ def main(): logger.info("Ingesting %s", resources) success_refs = [] try: - success_refs = ingester.run(resources) + success_refs = batch_ingester.run(resources) except RuntimeError: - pass + # Retry one by one + for resource in resources: + try: + success_refs.extend(one_by_one_ingester.run([resource])) + except _DuplicateIngestError: + pass + except Exception: + logger.exception("Error while ingesting %s", resource) except Exception: logger.exception("Error while ingesting %s", resources) @@ -249,18 +342,53 @@ def main(): logger.info("Defined visits for %s", ids) except Exception: logger.exception("Error while defining visits for %s", success_refs) - if not is_lfa and rucio_rse: - # Register with Rucio if we ingested anything - try: - rucio_interface.register(resources) - except Exception: - logger.exception("Rucio registration failed for %s", resources) - - # Atomically grab the next entry from the bucket queue, blocking until - # one exists. - r.blmove(redis_queue, worker_queue, 0, "RIGHT", "LEFT") - # Be greedy and take as many entries as exist up to max + + # Ingest if we have guiders + if guiders: + logger.info("Ingesting %s", guiders) + try: + ingest_guider( + butler, + guiders, + transfer="direct", + on_success=on_success, + on_metadata_failure=on_metadata_failure, + ) + except RuntimeError: + # Retry one by one + retries = False + for guider in guiders: + try: + ingest_guider( + butler, + [guider], + transfer="direct", + on_success=on_success, + on_ingest_failure=on_guider_ingest_failure, + on_undefined_exposure=on_undefined_exposure, + on_metadata_failure=on_metadata_failure, + ) + except _DuplicateIngestError: + pass + except _UndefinedExposureError: + retries = True + except Exception: + logger.exception("Error while ingesting %s", guider) + if retries: + # Wait for a science image to show up + time.sleep(0.5) + except Exception: + logger.exception("Error while ingesting %s", guiders) + + # If we have any retries, don't wait for new images n = r.llen(worker_queue) + if n == 0: + # Atomically grab the next entry from the bucket queue, blocking + # until one exists. + r.blmove(redis_queue, worker_queue, 0, "RIGHT", "LEFT") + n = 1 + + # Be greedy and take as many entries as exist up to max while n < max_ingests and r.lmove(redis_queue, worker_queue, "RIGHT", "LEFT"): n += 1 diff --git a/src/rucio_interface.py b/src/rucio_interface.py deleted file mode 100644 index a3e6b15..0000000 --- a/src/rucio_interface.py +++ /dev/null @@ -1,171 +0,0 @@ -# This file is part of embargo_butler. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (http://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import hashlib -import logging -import random -import re -import time -import zlib - -import rucio.common.exception -from lsst.resources import ResourcePath -from rucio.client.didclient import DIDClient -from rucio.client.replicaclient import ReplicaClient - -__all__ = ["RucioInterface"] - -logger = logging.getLogger(__name__) - - -class RucioInterface: - """Register files in Rucio and attach them to datasets. - - Parameters - ---------- - rucio_rse: `str` - Name of the RSE that the files live in. - dtn_url: `str` - Base URL of the data transfer node for the Rucio physical filename. - bucket: `str` - Name of the S3 bucket that the files live in. - scope: `str` - Rucio scope to register the files in. - """ - - def __init__(self, rucio_rse: str, dtn_url: str, bucket: str, scope: str): - self.rucio_rse = rucio_rse - self.dtn_url = dtn_url - self.pfn_base = f"{dtn_url}{bucket}/" - self.scope = scope - - self.replica_client = ReplicaClient() - self.did_client = DIDClient() - - def _make_did(self, res: ResourcePath) -> dict[str, str | int]: - """Make a Rucio data identifier dictionary from a resource. - - Parameters - ---------- - res: `lsst.resources.ResourcePath` - Path to the file. - - Returns - ------- - did: `dict [ str, str|int ]` - Rucio data identifier including physical and logical names, - byte length, adler32 and MD5 checksums, and scope. - """ - with res.open("rb") as f: - contents = f.read() - size = len(contents) - md5 = hashlib.md5(contents).hexdigest() - adler32 = f"{zlib.adler32(contents):08x}" - path = res.path.removeprefix("/") - pfn = self.pfn_base + path - return dict(pfn=pfn, bytes=size, adler32=adler32, md5=md5, name=path, scope=self.scope) - - def _add_files_to_dataset(self, dids: list[dict], dataset_id: str) -> None: - """Attach a list of files specified by Rucio DIDs to a Rucio dataset. - - Ignores already-attached files for idempotency. - - Parameters - ---------- - dids: `list [ dict [ str, str|int ] ]` - List of Rucio data identifiers. - dataset_id: `str` - Logical name of the Rucio dataset. - """ - retries = 0 - max_retries = 2 - while True: - try: - self.did_client.add_files_to_dataset( - scope=self.scope, - name=dataset_id, - files=dids, - rse=self.rucio_rse, - ) - return - except rucio.common.exception.FileAlreadyExists: - # At least one already is in the dataset. - # This shouldn't happen, but if it does, - # we have to retry each individually. - for did in dids: - try: - self.did_client.add_files_to_dataset( - scope=self.scope, - name=dataset_id, - files=[did], - rse=self.rucio_rse, - ) - except rucio.common.exception.FileAlreadyExists: - pass - return - except rucio.common.exception.DatabaseException: - retries += 1 - if retries < max_retries: - time.sleep(random.uniform(0.5, 2)) - continue - else: - raise - - def register(self, resources: list[ResourcePath]) -> None: - """Register a list of files in Rucio. - - Parameters - ---------- - resources: `list [ lsst.resources.ResourcePath ]` - List of resource paths to files. - """ - data = [self._make_did(r) for r in resources] - datasets = dict() - for did in data: - # For raw images, use a dataset per 100 exposures - dataset_id = re.sub( - r"(.+?)/(\d+)/[A-Z]{2}_[A-Z]_\2_(\d{4})\d{2}/.*", - r"Dataset/\1/\2/\3", - did["name"], - ) - datasets.setdefault(dataset_id, []).append(did) - - for dataset_id, dids in datasets.items(): - try: - logger.info("Registering %s in dataset %s, RSE %s", dids, dataset_id, self.rucio_rse) - self._add_files_to_dataset(dids, dataset_id) - except rucio.common.exception.DataIdentifierNotFound: - # No such dataset, so create it - try: - logger.info("Creating Rucio dataset %s", dataset_id) - self.did_client.add_dataset( - scope=self.scope, - name=dataset_id, - statuses={"monotonic": True}, - rse=self.rucio_rse, - ) - except rucio.common.exception.DataIdentifierAlreadyExists: - # If someone else created it in the meantime - pass - # And then retry adding DIDs - self._add_files_to_dataset(dids, dataset_id) - - logger.info("Done with Rucio for %s", resources)