@@ -27,9 +27,11 @@ def gen_database_url(branch, database):
2727class Runner :
2828 """Runner for executing and uploading a meta-analysis workflow."""
2929
30+ _TARGET_SPACE = "mni152_2mm"
31+
3032 _ENTITY_SNAPSHOT_KEYS = {
31- "studyset" : ("studyset_snapshot" , "cached_studyset " , "studyset " ),
32- "annotation" : ("annotation_snapshot" , "cached_annotation " , "annotation " ),
33+ "studyset" : ("studyset_snapshot" , "studyset " , "cached_studyset " ),
34+ "annotation" : ("annotation_snapshot" , "annotation " , "cached_annotation " ),
3335 }
3436 _ENTITY_STORE_PATHS = {
3537 "studyset" : "studysets" ,
@@ -243,10 +245,6 @@ def _get_project_document(self, meta_analysis):
243245 )
244246 return None
245247
246- def _get_entity_snapshot (self , entity_name , documents ):
247- snapshot , _ = self ._get_entity_snapshot_record (entity_name , documents )
248- return snapshot
249-
250248 def _get_entity_snapshot_record (self , entity_name , documents ):
251249 is_expected_snapshot = (
252250 self ._is_studyset_snapshot
@@ -352,6 +350,23 @@ def _download_entity_from_store(self, entity_name, entity_id, documents):
352350 except requests .exceptions .HTTPError :
353351 raise direct_error
354352
353+ def _collect_entity_records (self , documents ):
354+ records = {}
355+ for entity_name in self ._ENTITY_STORE_PATHS :
356+ snapshot , snapshot_id = self ._get_entity_snapshot_record (entity_name , documents )
357+ records [entity_name ] = {
358+ "snapshot" : snapshot ,
359+ "snapshot_id" : snapshot_id ,
360+ "neurostore_id" : self ._get_neurostore_id (entity_name , documents ),
361+ }
362+ return records
363+
364+ def _apply_entity_records (self , records ):
365+ self .existing_studyset_snapshot = records ["studyset" ]["snapshot" ]
366+ self .existing_studyset_snapshot_id = records ["studyset" ]["snapshot_id" ]
367+ self .existing_annotation_snapshot = records ["annotation" ]["snapshot" ]
368+ self .existing_annotation_snapshot_id = records ["annotation" ]["snapshot_id" ]
369+
355370 @staticmethod
356371 def _snapshot_md5 (payload ):
357372 serialized_payload = json .dumps (
@@ -375,36 +390,42 @@ def download_bundle(self):
375390 # id=self.meta_analysis_id, nested=True
376391 # ).to_dict() # does not currently return run_key
377392
378- result_documents = self ._get_result_documents (meta_analysis )
379- documents = [meta_analysis , * result_documents ]
380-
381- self .existing_studyset_snapshot , self .existing_studyset_snapshot_id = (
382- self ._get_entity_snapshot_record ("studyset" , documents )
383- )
384- self .existing_annotation_snapshot , self .existing_annotation_snapshot_id = (
385- self ._get_entity_snapshot_record ("annotation" , documents )
386- )
387-
393+ documents = [meta_analysis ]
394+ entity_records = self ._collect_entity_records (documents )
395+ self ._apply_entity_records (entity_records )
388396 neurostore_documents = list (documents )
389- studyset_id = self ._get_neurostore_id ("studyset" , neurostore_documents )
390- annotation_id = self ._get_neurostore_id ("annotation" , neurostore_documents )
391-
392- if studyset_id is None or annotation_id is None :
397+ should_fetch_result_documents = any (
398+ record ["snapshot" ] is None or record ["neurostore_id" ] is None
399+ for record in entity_records .values ()
400+ )
401+ if should_fetch_result_documents :
402+ result_documents = self ._get_result_documents (meta_analysis )
403+ if result_documents :
404+ documents .extend (result_documents )
405+ neurostore_documents = list (documents )
406+ entity_records = self ._collect_entity_records (documents )
407+ self ._apply_entity_records (entity_records )
408+
409+ if any (record ["neurostore_id" ] is None for record in entity_records .values ()):
393410 project_document = self ._get_project_document (meta_analysis )
394411 neurostore_documents .append (project_document )
395- studyset_id = self ._get_neurostore_id ( "studyset" , neurostore_documents )
396- annotation_id = self ._get_neurostore_id ( "annotation" , neurostore_documents )
412+ entity_records = self ._collect_entity_records ( neurostore_documents )
413+ self ._apply_entity_records ( entity_records )
397414
398- if studyset_id is not None and annotation_id is not None :
415+ if all ( record [ "neurostore_id" ] is not None for record in entity_records . values ()) :
399416 try :
400417 self .cached_studyset = self ._download_entity_from_store (
401- "studyset" , studyset_id , neurostore_documents
418+ "studyset" ,
419+ entity_records ["studyset" ]["neurostore_id" ],
420+ neurostore_documents ,
402421 )
403422 self .cached_annotation = self ._download_entity_from_store (
404- "annotation" , annotation_id , neurostore_documents
423+ "annotation" ,
424+ entity_records ["annotation" ]["neurostore_id" ],
425+ neurostore_documents ,
405426 )
406427 self .cached = False
407- except requests .exceptions .HTTPError :
428+ except requests .exceptions .RequestException :
408429 if (
409430 self .existing_studyset_snapshot is None
410431 or self .existing_annotation_snapshot is None
@@ -516,7 +537,7 @@ def apply_filter(self, studyset, annotation):
516537 # Load the JSON data into a dictionary
517538 reference_studyset_dict = json .loads (json_data )
518539
519- reference_studyset = Studyset (reference_studyset_dict )
540+ reference_studyset = Studyset (reference_studyset_dict , target = self . _TARGET_SPACE )
520541
521542 del reference_studyset_dict
522543 # get study ids from studyset
@@ -540,7 +561,7 @@ def apply_filter(self, studyset, annotation):
540561 return first_studyset , second_studyset
541562
542563 def process_bundle (self , n_cores = None ):
543- studyset = Studyset (self .cached_studyset )
564+ studyset = Studyset (self .cached_studyset , target = self . _TARGET_SPACE )
544565 annotation = Annotation (self .cached_annotation , studyset )
545566 first_studyset , second_studyset = self .apply_filter (studyset , annotation )
546567 estimator , corrector = self .load_specification (n_cores = n_cores )
@@ -550,26 +571,29 @@ def process_bundle(self, n_cores=None):
550571 self .corrector = corrector
551572
552573 def create_result_object (self ):
553- # take a snapshot of the studyset and annotation (before running the workflow)
554574 headers = {"Compose-Upload-Key" : self .nsc_key }
555575 data = {"meta_analysis_id" : self .meta_analysis_id }
556- if self ._should_link_existing_snapshot (
557- self .cached_studyset ,
558- self .existing_studyset_snapshot ,
559- self .existing_studyset_snapshot_id ,
560- ):
561- data ["cached_studyset" ] = self .existing_studyset_snapshot_id
562- else :
563- data ["studyset_snapshot" ] = self .cached_studyset
564-
565- if self ._should_link_existing_snapshot (
566- self .cached_annotation ,
567- self .existing_annotation_snapshot ,
568- self .existing_annotation_snapshot_id ,
569- ):
570- data ["cached_annotation" ] = self .existing_annotation_snapshot_id
571- else :
572- data ["annotation_snapshot" ] = self .cached_annotation
576+ entity_payloads = {
577+ "studyset" : (
578+ self .cached_studyset ,
579+ self .existing_studyset_snapshot ,
580+ self .existing_studyset_snapshot_id ,
581+ ),
582+ "annotation" : (
583+ self .cached_annotation ,
584+ self .existing_annotation_snapshot ,
585+ self .existing_annotation_snapshot_id ,
586+ ),
587+ }
588+ for entity_name , (live_payload , existing_payload , existing_id ) in entity_payloads .items ():
589+ if self ._should_link_existing_snapshot (
590+ live_payload ,
591+ existing_payload ,
592+ existing_id ,
593+ ):
594+ data [f"cached_{ entity_name } " ] = existing_id
595+ else :
596+ data [f"{ entity_name } _snapshot" ] = live_payload
573597
574598 resp = requests .post (
575599 f"{ self .compose_url } /meta-analysis-results" ,
0 commit comments