Skip to content

Commit 411239c

Browse files
authored
[FIX] logic for ingestion (#909)
* fix logic for ingestion * change name * make logic more robust
1 parent d30ac2e commit 411239c

File tree

1 file changed

+62
-27
lines changed

1 file changed

+62
-27
lines changed

store/neurostore/ingest/extracted_features.py

Lines changed: 62 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
Pipeline,
1111
PipelineConfig,
1212
PipelineStudyResult,
13+
BaseStudy,
1314
)
1415

1516

16-
def ingest_feature(feature_directory):
17+
def ingest_feature(feature_directory, overwrite=False):
1718
"""Ingest demographics data into the database."""
1819
# read pipeline_info.json from the base feature directory
1920
with open(op.join(feature_directory, "pipeline_info.json")) as f:
@@ -23,21 +24,19 @@ def ingest_feature(feature_directory):
2324
pipeline = (
2425
db.session.query(Pipeline)
2526
.filter(
26-
Pipeline.name == pipeline_info["name"],
27+
Pipeline.name == pipeline_info["extractor"],
2728
)
2829
.first()
2930
)
3031
# create a pipeline if it does not exist
3132
if not pipeline:
3233
pipeline = Pipeline(
33-
name=pipeline_info["name"],
34+
name=pipeline_info["extractor"],
3435
description=pipeline_info.get("description"),
35-
study_dependent=(
36-
True if pipeline_info.get("type", False) == "dependent" else False
37-
),
38-
ace_compatible="ace" in pipeline_info.get("input_sources", []),
39-
pubget_compatible="pubget" in pipeline_info.get("input_sources", []),
40-
derived_from=pipeline_info.get("derived_from", None),
36+
study_dependent=None,
37+
ace_compatible=True,
38+
pubget_compatible=True,
39+
derived_from=pipeline_info.get("input_pipelines", None),
4140
)
4241
db.session.add(pipeline)
4342

@@ -56,10 +55,8 @@ def ingest_feature(feature_directory):
5655
if not pipeline_config:
5756
# Build config_args from pipeline_info
5857
config_args = {
59-
"extractor": pipeline_info.get("extractor"),
6058
"extractor_kwargs": pipeline_info.get("extractor_kwargs", {}),
6159
"transform_kwargs": pipeline_info.get("transform_kwargs", {}),
62-
"input_pipelines": pipeline_info.get("input_pipelines", {}),
6360
}
6461

6562
pipeline_config = PipelineConfig(
@@ -76,27 +73,65 @@ def ingest_feature(feature_directory):
7673

7774
# for each subject directory, read the results.json file and the info.json file
7875
pipeline_study_results = []
79-
for paper_dir in paper_dirs:
80-
with open(op.join(paper_dir, "results.json")) as f:
81-
results = json.load(f)
82-
83-
with open(op.join(paper_dir, "info.json")) as f:
84-
info = json.load(f)
8576

77+
for paper_dir in paper_dirs:
8678
# use the directory name as the base_study_id
8779
base_study_id = paper_dir.name
88-
# create a new result record
89-
pipeline_study_results.append(
90-
PipelineStudyResult(
91-
base_study_id=base_study_id,
92-
result_data=results,
93-
date_executed=parse_date(info["date"]),
94-
file_inputs=info["inputs"],
95-
config=pipeline_config,
96-
status="SUCCESS",
80+
81+
if BaseStudy.query.filter_by(id=base_study_id).first() is None:
82+
print(
83+
f"Skipping {paper_dir} as it does not correspond to a valid base_study_id"
9784
)
85+
continue
86+
try:
87+
with open(op.join(paper_dir, "results.json")) as f:
88+
results = json.load(f)
89+
except FileNotFoundError:
90+
print(f"Skipping {paper_dir} as it does not contain results.json")
91+
continue
92+
except json.JSONDecodeError:
93+
print(f"Skipping {paper_dir} as it contains invalid JSON in results.json")
94+
continue
95+
try:
96+
with open(op.join(paper_dir, "info.json")) as f:
97+
info = json.load(f)
98+
except FileNotFoundError:
99+
print(f"Skipping {paper_dir} as it does not contain info.json")
100+
continue
101+
except json.JSONDecodeError:
102+
print(f"Skipping {paper_dir} as it contains invalid JSON in info.json")
103+
continue
104+
105+
# check for existing result
106+
existing_result = (
107+
db.session.query(PipelineStudyResult)
108+
.filter(
109+
PipelineStudyResult.base_study_id == base_study_id,
110+
PipelineStudyResult.config_id == pipeline_config.id,
111+
)
112+
.first()
98113
)
99114

100-
db.session.add_all(pipeline_study_results)
115+
if existing_result and overwrite:
116+
# update existing record
117+
existing_result.result_data = results
118+
existing_result.date_executed = parse_date(info["date"])
119+
existing_result.file_inputs = info["inputs"]
120+
existing_result.status = "SUCCESS"
121+
elif not existing_result:
122+
# create a new result record
123+
pipeline_study_results.append(
124+
PipelineStudyResult(
125+
base_study_id=base_study_id,
126+
result_data=results,
127+
date_executed=parse_date(info["date"]),
128+
file_inputs=info["inputs"],
129+
config=pipeline_config,
130+
status="SUCCESS",
131+
)
132+
)
133+
134+
if pipeline_study_results:
135+
db.session.add_all(pipeline_study_results)
101136

102137
db.session.commit()

0 commit comments

Comments
 (0)