Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build-manually.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ on:
rubinenvVersion:
description: 'rubin-env version'
required: true
default: '12.0.0'
default: '12.1.0'
obsLsstVersion:
description: 'Science Pipelines release'
required: true
default: 'w_2025_29'
default: 'w_2026_13'
tag:
description: 'embargo-butler tag'
required: true
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.ingest
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

# Dockerfile for ingest service.

ARG RUBINENV_VERSION=12.0.0
ARG RUBINENV_VERSION=12.1.0
FROM ghcr.io/lsst-dm/docker-newinstall:9-latest-${RUBINENV_VERSION}
ARG OBS_LSST_VERSION
ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2025_43}
ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2026_13}
USER lsst
RUN source loadLSST.bash && mamba install redis-py
RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" lsst_obs
Expand Down
50 changes: 26 additions & 24 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import os
import socket
import time
from collections import defaultdict

import astropy.io.fits
import requests
Expand Down Expand Up @@ -62,7 +61,6 @@
worker_name = socket.gethostname()
worker_queue = f"WORKER:{bucket}:{worker_name}"

success_refs = []
retry_guider_obsids = dict()


Expand Down Expand Up @@ -280,26 +278,44 @@ def main():

logger.info("Initializing Butler from %s", butler_repo)
butler = Butler(butler_repo, writeable=True)

if not is_lfa:
define_visits_config = DefineVisitsTask.ConfigClass()
define_visits_config.groupExposures = "one-to-one"
visit_definer = DefineVisitsTask(config=define_visits_config, butler=butler)

def on_exposure_record(record):
"""Define visits when a new exposure dimension
record is synced (ctrl_oods pattern)."""

try:
visit_definer.run([record], incremental=True)
logger.info("Defined visits for %s", record.dataId)
except Exception:
logger.exception("Error while defining visits for %s", record.dataId)

ingest_config = RawIngestTask.ConfigClass()
ingest_config.transfer = "direct"
batch_ingester = RawIngestTask(
batch_kwargs = dict(
config=ingest_config,
butler=butler,
on_success=on_success,
on_metadata_failure=on_metadata_failure,
)
one_by_one_ingester = RawIngestTask(
if not is_lfa:
batch_kwargs["on_exposure_record"] = on_exposure_record
batch_ingester = RawIngestTask(**batch_kwargs)

one_by_one_kwargs = dict(
config=ingest_config,
butler=butler,
on_success=on_success,
on_ingest_failure=on_ingest_failure,
on_metadata_failure=on_metadata_failure,
)

if not is_lfa:
define_visits_config = DefineVisitsTask.ConfigClass()
define_visits_config.groupExposures = "one-to-one"
visit_definer = DefineVisitsTask(config=define_visits_config, butler=butler)
one_by_one_kwargs["on_exposure_record"] = on_exposure_record
one_by_one_ingester = RawIngestTask(**one_by_one_kwargs)

logger.info("Waiting on %s", worker_queue)
while True:
Expand All @@ -315,34 +331,20 @@ def main():
record_groups(resources)

logger.info("Ingesting %s", resources)
success_refs = []
try:
success_refs = batch_ingester.run(resources)
batch_ingester.run(resources)
except RuntimeError:
# Retry one by one
for resource in resources:
try:
success_refs.extend(one_by_one_ingester.run([resource]))
one_by_one_ingester.run([resource])
except _DuplicateIngestError:
pass
except Exception:
logger.exception("Error while ingesting %s", resource)
except Exception:
logger.exception("Error while ingesting %s", resources)

# Define visits if we ingested anything
if not is_lfa and success_refs:
id_dict = defaultdict(list)
for ref in success_refs:
data_id = ref.dataId
id_dict[data_id["instrument"]].append(data_id)
for ids in id_dict.values():
try:
visit_definer.run(ids, incremental=True)
logger.info("Defined visits for %s", ids)
except Exception:
logger.exception("Error while defining visits for %s", success_refs)

# Ingest if we have guiders
if guiders:
logger.info("Ingesting %s", guiders)
Expand Down
Loading