Skip to content

Commit 6744107

Browse files
committed
Reorganize guider ingest.
1 parent 3b13c16 commit 6744107

File tree

1 file changed

+42
-11
lines changed

1 file changed

+42
-11
lines changed

src/ingest.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,31 @@ def on_ingest_failure(dataset, exc):
117117
r.lrem(worker_queue, 0, info.path)
118118

119119

120+
def on_guider_ingest_failure(datasets, exc):
121+
"""Callback for ingest failure.
122+
123+
Record statistics; give up on the dataset if it fails 3 times.
124+
125+
Parameters
126+
----------
127+
datasets: `list` [ `lsst.daf.butler.FileDataset` ]
128+
Raw guider datasets that failed ingest.
129+
exc: `Exception`
130+
Exception raised by the ingest failure.
131+
"""
132+
for dataset in datasets:
133+
logger.error("Failed to ingest %s: %s", dataset, exc)
134+
info = Info.from_path(dataset.path.geturl())
135+
logger.debug("%s", info)
136+
with r.pipeline() as pipe:
137+
pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
138+
pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc))
139+
pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1)
140+
pipe.execute()
141+
if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= MAX_FAILURES:
142+
r.lrem(worker_queue, 0, info.path)
143+
144+
120145
def on_metadata_failure(dataset, exc):
121146
"""Callback for metadata parsing failure.
122147
@@ -226,17 +251,6 @@ def main():
226251
success_refs = []
227252
try:
228253
success_refs = ingester.run(resources)
229-
if guiders:
230-
success_refs.extend(
231-
ingest_guider(
232-
butler,
233-
guiders,
234-
transfer="direct",
235-
on_success=on_success,
236-
on_ingest_failure=on_ingest_failure,
237-
on_metadata_failure=on_metadata_failure,
238-
)
239-
)
240254
except RuntimeError:
241255
pass
242256
except Exception:
@@ -255,6 +269,23 @@ def main():
255269
except Exception:
256270
logger.exception("Error while defining visits for %s", success_refs)
257271

272+
# Ingest if we have guiders
273+
if guiders:
274+
logger.info("Ingesting %s", guiders)
275+
try:
276+
ingest_guider(
277+
butler,
278+
guiders,
279+
transfer="direct",
280+
on_success=on_success,
281+
on_ingest_failure=on_guider_ingest_failure,
282+
on_metadata_failure=on_metadata_failure,
283+
)
284+
except RuntimeError:
285+
pass
286+
except Exception:
287+
logger.exception("Error while ingesting %s", guiders)
288+
258289
# Atomically grab the next entry from the bucket queue, blocking until
259290
# one exists.
260291
r.blmove(redis_queue, worker_queue, 0, "RIGHT", "LEFT")

0 commit comments

Comments
 (0)