Skip to content

Commit 0de44f9

Browse files
committed
Handle all files on ingest failure.
1 parent 400b14e commit 0de44f9

1 file changed

Lines changed: 18 additions & 17 deletions

File tree

src/ingest.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,32 +96,33 @@ def on_success(datasets):
9696
logger.exception("Webhook exception for %s", info_dict)
9797

9898

99-
def on_ingest_failure(dataset, exc):
99+
def on_ingest_failure(exposure_data, exc):
100100
"""Callback for ingest failure.
101101
102102
Record statistics; give up on the dataset if it fails 3 times.
103103
104104
Parameters
105105
----------
106-
dataset: `lsst.obs.base.ingest.RawFileData`
107-
Raw dataset that failed ingest.
106+
exposure_data: `lsst.obs.base.ingest.RawExposureData`
107+
Information about raw datasets that failed ingest.
108108
exc: `Exception`
109109
Exception raised by the ingest failure.
110110
"""
111-
logger.error("Failed to ingest %s: %s", dataset, exc)
112-
info = Info.from_path(dataset.files[0].filename.geturl())
113-
logger.debug("%s", info)
114-
if "Datastore already contains" in str(exc):
115-
# Don't retry these
116-
r.lrem(worker_queue, 0, info.path)
117-
return
118-
with r.pipeline() as pipe:
119-
pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
120-
pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc))
121-
pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1)
122-
pipe.execute()
123-
if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= max_failures:
124-
r.lrem(worker_queue, 0, info.path)
111+
logger.error("Failed to ingest %s: %s", exposure_data, exc)
112+
for f in exposure_data.files:
113+
info = Info.from_path(f.filename.geturl())
114+
logger.debug("%s", info)
115+
if "Datastore already contains" in str(exc):
116+
# Don't retry these
117+
r.lrem(worker_queue, 0, info.path)
118+
return
119+
with r.pipeline() as pipe:
120+
pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
121+
pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc))
122+
pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1)
123+
pipe.execute()
124+
if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= max_failures:
125+
r.lrem(worker_queue, 0, info.path)
125126

126127

127128
def on_guider_ingest_failure(datasets, exc):

0 commit comments

Comments
 (0)