Skip to content

Commit 33b0207

Browse files
committed
Don't drop partial failure batches.
1 parent 29c73da commit 33b0207

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

src/ingest.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
dtn_url += "/"
6767
rucio_interface = RucioInterface(rucio_rse, dtn_url, bucket, os.environ["RUCIO_SCOPE"])
6868

69+
success_refs = []
70+
6971

7072
def on_success(datasets):
7173
"""Callback for successful ingest.
@@ -77,11 +79,14 @@ def on_success(datasets):
7779
datasets: `list` [`lsst.daf.butler.FileDataset`]
7880
The successfully-ingested datasets.
7981
"""
82+
global success_refs
83+
8084
webhook_filenames = dict()
8185
for dataset in datasets:
8286
logger.info("Ingested %s", dataset)
8387
info = Info.from_path(dataset.path.geturl())
8488
logger.debug("%s", info)
89+
success_refs.extend(dataset.refs)
8590
with r.pipeline() as pipe:
8691
pipe.lrem(worker_queue, 0, info.path)
8792
pipe.hset(f"FILE:{info.path}", "ingest_time", str(time.time()))
@@ -194,6 +199,8 @@ def record_groups(resources: list[ResourcePath]) -> None:
194199

195200
def main():
196201
"""Ingest FITS files from a Redis queue."""
202+
global success_refs
203+
197204
logger.info("Initializing Butler from %s", butler_repo)
198205
butler = Butler(butler_repo, writeable=True)
199206
ingest_config = RawIngestTask.ConfigClass()
@@ -225,26 +232,26 @@ def main():
225232
record_groups(resources)
226233

227234
logger.info("Ingesting %s", resources)
228-
refs = None
235+
success_refs = []
229236
try:
230-
refs = ingester.run(resources)
237+
success_refs = ingester.run(resources)
231238
except RuntimeError:
232239
pass
233240
except Exception:
234241
logger.exception("Error while ingesting %s")
235242

236243
# Define visits if we ingested anything
237-
if not is_lfa and refs:
244+
if not is_lfa and success_refs:
238245
id_dict = defaultdict(list)
239-
for ref in refs:
246+
for ref in success_refs:
240247
data_id = ref.dataId
241248
id_dict[data_id["instrument"]].append(data_id)
242249
for ids in id_dict.values():
243250
try:
244251
visit_definer.run(ids, incremental=True)
245252
logger.info("Defined visits for %s", ids)
246253
except Exception:
247-
logger.exception("Error while defining visits for %s", refs)
254+
logger.exception("Error while defining visits for %s", success_refs)
248255
if not is_lfa and rucio_rse:
249256
# Register with Rucio if we ingested anything
250257
try:

0 commit comments

Comments
 (0)