2727import os
2828import socket
2929import time
30- from collections import defaultdict
3130
3231import astropy .io .fits
3332import requests
6261worker_name = socket .gethostname ()
6362worker_queue = f"WORKER:{ bucket } :{ worker_name } "
6463
65- success_refs = []
6664retry_guider_obsids = dict ()
6765
6866
@@ -280,26 +278,44 @@ def main():
280278
281279 logger .info ("Initializing Butler from %s" , butler_repo )
282280 butler = Butler (butler_repo , writeable = True )
281+
282+ if not is_lfa :
283+ define_visits_config = DefineVisitsTask .ConfigClass ()
284+ define_visits_config .groupExposures = "one-to-one"
285+ visit_definer = DefineVisitsTask (config = define_visits_config , butler = butler )
286+
287+ def on_exposure_record (record ):
288+ """Define visits when a new exposure dimension
289+ record is synced (ctrl_oods pattern)."""
290+
291+ try :
292+ visit_definer .run ([record ], incremental = True )
293+ logger .info ("Defined visits for %s" , record .dataId )
294+ except Exception :
295+ logger .exception ("Error while defining visits for %s" , record .dataId )
296+
283297 ingest_config = RawIngestTask .ConfigClass ()
284298 ingest_config .transfer = "direct"
285- batch_ingester = RawIngestTask (
299+ batch_kwargs = dict (
286300 config = ingest_config ,
287301 butler = butler ,
288302 on_success = on_success ,
289303 on_metadata_failure = on_metadata_failure ,
290304 )
291- one_by_one_ingester = RawIngestTask (
305+ if not is_lfa :
306+ batch_kwargs ["on_exposure_record" ] = on_exposure_record
307+ batch_ingester = RawIngestTask (** batch_kwargs )
308+
309+ one_by_one_kwargs = dict (
292310 config = ingest_config ,
293311 butler = butler ,
294312 on_success = on_success ,
295313 on_ingest_failure = on_ingest_failure ,
296314 on_metadata_failure = on_metadata_failure ,
297315 )
298-
299316 if not is_lfa :
300- define_visits_config = DefineVisitsTask .ConfigClass ()
301- define_visits_config .groupExposures = "one-to-one"
302- visit_definer = DefineVisitsTask (config = define_visits_config , butler = butler )
317+ one_by_one_kwargs ["on_exposure_record" ] = on_exposure_record
318+ one_by_one_ingester = RawIngestTask (** one_by_one_kwargs )
303319
304320 logger .info ("Waiting on %s" , worker_queue )
305321 while True :
@@ -315,34 +331,20 @@ def main():
315331 record_groups (resources )
316332
317333 logger .info ("Ingesting %s" , resources )
318- success_refs = []
319334 try :
320- success_refs = batch_ingester .run (resources )
335+ batch_ingester .run (resources )
321336 except RuntimeError :
322337 # Retry one by one
323338 for resource in resources :
324339 try :
325- success_refs . extend ( one_by_one_ingester .run ([resource ]) )
340+ one_by_one_ingester .run ([resource ])
326341 except _DuplicateIngestError :
327342 pass
328343 except Exception :
329344 logger .exception ("Error while ingesting %s" , resource )
330345 except Exception :
331346 logger .exception ("Error while ingesting %s" , resources )
332347
333- # Define visits if we ingested anything
334- if not is_lfa and success_refs :
335- id_dict = defaultdict (list )
336- for ref in success_refs :
337- data_id = ref .dataId
338- id_dict [data_id ["instrument" ]].append (data_id )
339- for ids in id_dict .values ():
340- try :
341- visit_definer .run (ids , incremental = True )
342- logger .info ("Defined visits for %s" , ids )
343- except Exception :
344- logger .exception ("Error while defining visits for %s" , success_refs )
345-
346348 # Ingest if we have guiders
347349 if guiders :
348350 logger .info ("Ingesting %s" , guiders )
0 commit comments