Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Dockerfile.ingest
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@

# Dockerfile for ingest service.

ARG RUBINENV_VERSION=10.0.0
ARG RUBINENV_VERSION=12.0.0
FROM ghcr.io/lsst-dm/docker-newinstall:9-latest-${RUBINENV_VERSION}
ARG OBS_LSST_VERSION
ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2025_29}
ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2025_43}
USER lsst
RUN source loadLSST.bash && mamba install redis-py
RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" lsst_obs
COPY src/ingest.py src/info.py src/utils.py src/rucio_interface.py ./ingest/
COPY src/ingest.py src/info.py src/utils.py ./ingest/
# Environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD BUCKET BUTLER_REPO
# For Rucio (all must be set if RUCIO_RSE is set):
Expand Down
198 changes: 163 additions & 35 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@
import requests
from lsst.daf.butler import Butler
from lsst.obs.base import DefineVisitsTask, RawIngestTask
from lsst.obs.lsst import ingest_guider
from lsst.resources import ResourcePath

from info import Info
from rucio_interface import RucioInterface
from utils import setup_logging, setup_redis

MAX_FAILURES: int = 3
"""Retry ingests until this many failures (`int`)."""
max_failures = int(os.environ.get("MAX_FAILURES", "3"))
"""Retry ingests until this many failures."""

max_guider_wait = float(os.environ.get("MAX_GUIDER_WAIT", "5.0"))
"""Wait this long for science images to arrive after guiders."""

max_ingests = int(os.environ.get("MAX_INGESTS", "10"))
"""Accept up to this many files to ingest at once."""
Expand All @@ -59,15 +62,16 @@
worker_name = socket.gethostname()
worker_queue = f"WORKER:{bucket}:{worker_name}"

if not is_lfa:
rucio_rse = os.environ.get("RUCIO_RSE", None)
if rucio_rse:
dtn_url = os.environ["RUCIO_DTN"]
if not dtn_url.endswith("/"):
dtn_url += "/"
rucio_interface = RucioInterface(rucio_rse, dtn_url, bucket, os.environ["RUCIO_SCOPE"])

success_refs = []
retry_guider_obsids = dict()


class _DuplicateIngestError(RuntimeError):
pass


class _UndefinedExposureError(RuntimeError):
pass


def on_success(datasets):
Expand All @@ -86,45 +90,120 @@ def on_success(datasets):
logger.info("Ingested %s", dataset)
info = Info.from_path(dataset.path.geturl())
logger.debug("%s", info)
success_refs.extend(dataset.refs)
with r.pipeline() as pipe:
pipe.lrem(worker_queue, 0, info.path)
pipe.hset(f"FILE:{info.path}", "ingest_time", str(time.time()))
pipe.hincrby(f"INGEST:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
pipe.execute()
if not is_lfa:
webhook_filenames.setdefault(info.exp_id, []).append(info.filename)
# If we ingested something, guiders should now be able to be ingested.
if info.exp_id in retry_guider_obsids:
del retry_guider_obsids[info.exp_id]
if webhook_uri:
for exp_id in webhook_filenames:
info_dict = {"exp_id": exp_id, "filenames": webhook_filenames[exp_id]}
resp = requests.post(webhook_uri, json=info_dict, timeout=0.5)
logger.info("Webhook response %s: %s", info_dict, resp)
try:
resp = requests.post(webhook_uri, json=info_dict, timeout=0.5)
logger.info("Webhook response %s: %s", info_dict, resp)
except Exception:
# Ignore webhook exceptions
logger.exception("Webhook exception for %s", info_dict)


def on_ingest_failure(dataset, exc):
def on_ingest_failure(exposure_data, exc):
"""Callback for ingest failure.

Record statistics; give up on the dataset if it fails 3 times.

Parameters
----------
dataset: `lsst.obs.base.ingest.RawFileData`
Raw dataset that failed ingest.
exposure_data: `lsst.obs.base.ingest.RawExposureData`
Information about raw datasets that failed ingest.
exc: `Exception`
Exception raised by the ingest failure.
"""
logger.error("Failed to ingest %s: %s", dataset, exc)
info = Info.from_path(dataset.files[0].filename.geturl())
assert len(exposure_data.files) == 1
logger.info("Failed to ingest %s: %s", exposure_data, exc)
f = exposure_data.files[0]
info = Info.from_path(f.filename.geturl())
logger.debug("%s", info)
if "Datastore already contains" in str(exc):
logger.info("Already ingested %s", info.path)
# Don't retry these
r.lrem(worker_queue, 0, info.path)
raise _DuplicateIngestError
logger.warning("Marking for retry %s", info.path)
with r.pipeline() as pipe:
pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc))
pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1)
pipe.execute()
if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= max_failures:
logger.error("Giving up on %s", info.path)
r.lrem(worker_queue, 0, info.path)


def on_guider_ingest_failure(datasets, exc):
"""Callback for guider ingest failure.

Record statistics; give up on the dataset if it fails 3 times.

Parameters
----------
datasets: `list` [ `lsst.daf.butler.FileDataset` ]
Raw guider datasets that failed ingest.
exc: `Exception`
Exception raised by the ingest failure.
"""
assert len(datasets) == 1
dataset = datasets[0]
logger.info("Failed to ingest %s: %s", dataset, exc)
info = Info.from_path(dataset.path.geturl())
logger.debug("%s", info)
if "Datastore already contains" in str(exc):
logger.info("Already ingested %s", info.path)
# Don't retry these
r.lrem(worker_queue, 0, info.path)
raise _DuplicateIngestError
logger.warning("Marking for retry %s", info.path)
with r.pipeline() as pipe:
pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc))
pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1)
pipe.execute()
if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= MAX_FAILURES:
if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= max_failures:
logger.error("Giving up on %s", info.path)
r.lrem(worker_queue, 0, info.path)


def on_undefined_exposure(resource, obs_id):
"""Callback for undefined exposure while ingesting guiders.

Don't do normal retry processing for these

Parameters
----------
resource: `lsst.resources.ResourcePath`
Raw guider dataset that failed ingest.
obs_id: `str`
Observation id for which no exposure record was found.
"""
# No need to log; ingest_guider does that already
if obs_id not in retry_guider_obsids:
retry_guider_obsids[obs_id] = time.time()
if retry_guider_obsids[obs_id] + max_guider_wait <= time.time():
info = Info.from_path(resource.geturl())
with r.pipeline() as pipe:
pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
pipe.hset(f"FILE:{info.path}", "ing_fail_exc", "Max wait")
pipe.execute()
logger.error("Giving up on %s", info.path)
r.lrem(worker_queue, 0, info.path)
del retry_guider_obsids[obs_id]
raise _UndefinedExposureError


def on_metadata_failure(dataset, exc):
"""Callback for metadata parsing failure.

Expand Down Expand Up @@ -203,7 +282,13 @@ def main():
butler = Butler(butler_repo, writeable=True)
ingest_config = RawIngestTask.ConfigClass()
ingest_config.transfer = "direct"
ingester = RawIngestTask(
batch_ingester = RawIngestTask(
config=ingest_config,
butler=butler,
on_success=on_success,
on_metadata_failure=on_metadata_failure,
)
one_by_one_ingester = RawIngestTask(
config=ingest_config,
butler=butler,
on_success=on_success,
Expand All @@ -221,7 +306,8 @@ def main():
# Process any entries on the worker queue.
if r.llen(worker_queue) > 0:
blobs = r.lrange(worker_queue, 0, -1)
resources = [ResourcePath(f"s3://{b.decode()}") for b in blobs]
resources = [ResourcePath(f"s3://{b.decode()}") for b in blobs if b"_guider" not in b]
guiders = [ResourcePath(f"s3://{b.decode()}") for b in blobs if b"_guider" in b]

# Ingest if we have resources
if resources:
Expand All @@ -231,9 +317,16 @@ def main():
logger.info("Ingesting %s", resources)
success_refs = []
try:
success_refs = ingester.run(resources)
success_refs = batch_ingester.run(resources)
except RuntimeError:
pass
# Retry one by one
for resource in resources:
try:
success_refs.extend(one_by_one_ingester.run([resource]))
except _DuplicateIngestError:
pass
except Exception:
logger.exception("Error while ingesting %s", resource)
except Exception:
logger.exception("Error while ingesting %s", resources)

Expand All @@ -249,18 +342,53 @@ def main():
logger.info("Defined visits for %s", ids)
except Exception:
logger.exception("Error while defining visits for %s", success_refs)
if not is_lfa and rucio_rse:
# Register with Rucio if we ingested anything
try:
rucio_interface.register(resources)
except Exception:
logger.exception("Rucio registration failed for %s", resources)

# Atomically grab the next entry from the bucket queue, blocking until
# one exists.
r.blmove(redis_queue, worker_queue, 0, "RIGHT", "LEFT")
# Be greedy and take as many entries as exist up to max

# Ingest if we have guiders
if guiders:
logger.info("Ingesting %s", guiders)
try:
ingest_guider(
butler,
guiders,
transfer="direct",
on_success=on_success,
on_metadata_failure=on_metadata_failure,
)
except RuntimeError:
# Retry one by one
retries = False
for guider in guiders:
try:
ingest_guider(
butler,
[guider],
transfer="direct",
on_success=on_success,
on_ingest_failure=on_guider_ingest_failure,
on_undefined_exposure=on_undefined_exposure,
on_metadata_failure=on_metadata_failure,
)
except _DuplicateIngestError:
pass
except _UndefinedExposureError:
retries = True
except Exception:
logger.exception("Error while ingesting %s", guider)
if retries:
# Wait for a science image to show up
time.sleep(0.5)
except Exception:
logger.exception("Error while ingesting %s", guiders)

# If we have any retries, don't wait for new images
n = r.llen(worker_queue)
if n == 0:
# Atomically grab the next entry from the bucket queue, blocking
# until one exists.
r.blmove(redis_queue, worker_queue, 0, "RIGHT", "LEFT")
n = 1

# Be greedy and take as many entries as exist up to max
while n < max_ingests and r.lmove(redis_queue, worker_queue, "RIGHT", "LEFT"):
n += 1

Expand Down
Loading