Skip to content

Commit b3788e3

Browse files
authored
wip: add feature tables (#842)
* wip: add feature tables * wip: add functioning tests * fix tests * run black * remove commented lines * use db for ingestion, and remove extra imports * add crud interface for pipeline * update openapi * style with black * pacify flake8
1 parent 71d9f1d commit b3788e3

File tree

13 files changed

+846
-6
lines changed

13 files changed

+846
-6
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""Ingest extracted features into the database."""
2+
3+
import json
4+
import os.path as op
5+
from pathlib import Path
6+
import hashlib
7+
from dateutil.parser import parse as parse_date
8+
9+
from neurostore.database import db
10+
from neurostore.models import (
11+
Pipeline,
12+
PipelineConfig,
13+
PipelineRun,
14+
PipelineRunResult,
15+
)
16+
17+
18+
def ingest_feature(feature_directory):
19+
"""Ingest demographics data into the database."""
20+
# read pipeline_info.json from the base feature directory
21+
with open(op.join(feature_directory, "pipeline_info.json")) as f:
22+
pipeline_info = json.load(f)
23+
24+
# search if there is an existing pipeline with the same name and version
25+
pipeline = (
26+
db.session.query(Pipeline)
27+
.filter(
28+
Pipeline.name == pipeline_info["name"],
29+
Pipeline.version == pipeline_info["version"],
30+
)
31+
.first()
32+
)
33+
# create a pipeline if it does not exist
34+
if not pipeline:
35+
pipeline = Pipeline(
36+
name=pipeline_info["name"],
37+
version=pipeline_info["version"],
38+
description=pipeline_info.get("description"),
39+
study_dependent=(
40+
True if pipeline_info.get("type", False) == "dependent" else False
41+
),
42+
ace_compatible="ace"
43+
in pipeline_info.get("arguments", {}).get("input_sources", []),
44+
pubget_compatible="pubget"
45+
in pipeline_info.get("arguments", {}).get("input_sources", []),
46+
derived_from=pipeline_info.get("derived_from", None),
47+
)
48+
db.session.add(pipeline)
49+
50+
# search within the pipeline and see if there are any existing pipeline configs
51+
# that match the "arguements" field in the pipeline_info.json
52+
# create a hash of the config arguments
53+
config_hash = hashlib.sha256(
54+
json.dumps(pipeline_info["arguments"]).encode()
55+
).hexdigest()
56+
pipeline_config = (
57+
db.session.query(PipelineConfig)
58+
.filter(
59+
PipelineConfig.pipeline_id == pipeline.id,
60+
PipelineConfig.config_hash == config_hash,
61+
)
62+
.first()
63+
)
64+
# create a pipeline config if it does not exist
65+
if not pipeline_config:
66+
pipeline_config = PipelineConfig(
67+
pipeline_id=pipeline.id,
68+
config=pipeline_info["arguments"],
69+
config_hash=config_hash,
70+
)
71+
db.session.add(pipeline_config)
72+
73+
# create a new pipeline run
74+
pipeline_run = PipelineRun(
75+
pipeline_id=pipeline.id,
76+
config_id=pipeline_config.id,
77+
)
78+
79+
# get a list of all the paper directories in the feature directory
80+
paper_dirs = [d for d in Path(feature_directory).iterdir() if d.is_dir()]
81+
82+
# for each subject directory, read the results.json file and the info.json file
83+
pipeline_run_results = []
84+
for paper_dir in paper_dirs:
85+
with open(op.join(paper_dir, "results.json")) as f:
86+
results = json.load(f)
87+
88+
with open(op.join(paper_dir, "info.json")) as f:
89+
info = json.load(f)
90+
91+
# use the directory name as the base_study_id
92+
base_study_id = paper_dir.name
93+
# create a new result record
94+
pipeline_run_results.append(
95+
PipelineRunResult(
96+
base_study_id=base_study_id,
97+
data=results,
98+
date_executed=parse_date(info["date"]),
99+
file_inputs=info["inputs"],
100+
run=pipeline_run,
101+
)
102+
)
103+
104+
db.session.add(pipeline_run)
105+
db.session.add_all(pipeline_run_results)
106+
107+
db.session.commit()

store/neurostore/models/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
AnnotationAnalysis,
1313
PointValue,
1414
AnalysisConditions,
15+
Pipeline,
16+
PipelineConfig,
17+
PipelineRun,
18+
PipelineRunResult,
19+
PipelineRunResultVote,
1520
)
1621
from .auth import User, Role
1722

@@ -31,4 +36,9 @@
3136
"AnalysisConditions",
3237
"User",
3338
"Role",
39+
"Pipeline",
40+
"PipelineConfig",
41+
"PipelineRun",
42+
"PipelineRunResult",
43+
"PipelineRunResultVote",
3444
]

store/neurostore/models/data.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ class Study(BaseMixin, db.Model):
279279
level = db.Column(db.String)
280280
metadata_ = db.Column(JSONB)
281281
source = db.Column(db.String, index=True)
282+
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
282283
source_id = db.Column(db.String, index=True)
283284
source_updated_at = db.Column(db.DateTime(timezone=True))
284285
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
@@ -538,6 +539,75 @@ class PointValue(BaseMixin, db.Model):
538539
user = relationship("User", backref=backref("point_values", passive_deletes=True))
539540

540541

542+
class Pipeline(BaseMixin, db.Model):
543+
__tablename__ = "pipelines"
544+
545+
name = db.Column(db.String)
546+
description = db.Column(db.String)
547+
version = db.Column(db.String)
548+
study_dependent = db.Column(db.Boolean, default=False)
549+
ace_compatible = db.Column(db.Boolean, default=False)
550+
pubget_compatible = db.Column(db.Boolean, default=False)
551+
derived_from = db.Column(db.Text)
552+
553+
554+
class PipelineConfig(BaseMixin, db.Model):
555+
__tablename__ = "pipeline_configs"
556+
557+
pipeline_id = db.Column(
558+
db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True
559+
)
560+
config = db.Column(JSONB)
561+
config_hash = db.Column(db.String, index=True)
562+
pipeline = relationship(
563+
"Pipeline", backref=backref("configs", passive_deletes=True)
564+
)
565+
566+
567+
class PipelineRun(BaseMixin, db.Model):
568+
__tablename__ = "pipeline_runs"
569+
570+
pipeline_id = db.Column(
571+
db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True
572+
)
573+
config_id = db.Column(
574+
db.Text, db.ForeignKey("pipeline_configs.id", ondelete="CASCADE"), index=True
575+
)
576+
config = relationship(
577+
"PipelineConfig", backref=backref("runs", passive_deletes=True)
578+
)
579+
run_index = db.Column(db.Integer())
580+
581+
582+
class PipelineRunResult(BaseMixin, db.Model):
583+
__tablename__ = "pipeline_run_results"
584+
585+
run_id = db.Column(
586+
db.Text, db.ForeignKey("pipeline_runs.id", ondelete="CASCADE"), index=True
587+
)
588+
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
589+
date_executed = db.Column(db.DateTime(timezone=True))
590+
data = db.Column(JSONB)
591+
file_inputs = db.Column(JSONB)
592+
run = relationship("PipelineRun", backref=backref("results", passive_deletes=True))
593+
594+
595+
class PipelineRunResultVote(BaseMixin, db.Model):
596+
__tablename__ = "pipeline_run_result_votes"
597+
598+
run_result_id = db.Column(
599+
db.Text,
600+
db.ForeignKey("pipeline_run_results.id", ondelete="CASCADE"),
601+
index=True,
602+
)
603+
user_id = db.Column(db.Text, db.ForeignKey("users.external_id"), index=True)
604+
accurate = db.Column(db.Boolean)
605+
run_result = relationship(
606+
"PipelineRunResult", backref=backref("votes", passive_deletes=True)
607+
)
608+
user = relationship("User", backref=backref("votes", passive_deletes=True))
609+
610+
541611
# from . import event_listeners # noqa E402
542612

543613
# del event_listeners

store/neurostore/openapi

store/neurostore/resources/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
UsersView,
1616
)
1717

18+
from .pipeline import (
19+
PipelinesView,
20+
PipelineConfigsView,
21+
PipelineRunsView,
22+
PipelineRunResultsView,
23+
PipelineRunResultVotesView,
24+
)
25+
1826
__all__ = [
1927
"StudysetsView",
2028
"AnnotationsView",
@@ -27,4 +35,9 @@
2735
"PointsView",
2836
"PointValuesView",
2937
"UsersView",
38+
"PipelinesView",
39+
"PipelineConfigsView",
40+
"PipelineRunsView",
41+
"PipelineRunResultsView",
42+
"PipelineRunResultVotesView",
3043
]

store/neurostore/resources/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ def search(self):
619619
validate_search_query(s)
620620
except errors.SyntaxError as e:
621621
abort(400, description=e.args[0])
622-
tsquery = func.to_tsquery('english', pubmed_to_tsquery(s))
622+
tsquery = func.to_tsquery("english", pubmed_to_tsquery(s))
623623
q = q.filter(m._ts_vector.op("@@")(tsquery))
624624

625625
# Alternatively (or in addition), search on individual fields.

0 commit comments

Comments
 (0)