Skip to content

Commit 9064cd7

Browse files
authored
[ENH] query feature (#875)
* wip: query features * wip: search based on metadata * wip: separate return and filters of features * wip: restructure from feedback * run black * wip: refactor query feature * fix query building and testing * test the or functionality * fix jsonpath string generation * fix tests for most the schemas * fix tests * formatting * formatting
1 parent 6a998dc commit 9064cd7

File tree

18 files changed

+1202
-597
lines changed

18 files changed

+1202
-597
lines changed

store/neurostore/ingest/extracted_features.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
from neurostore.models import (
1111
Pipeline,
1212
PipelineConfig,
13-
PipelineRun,
14-
PipelineRunResult,
13+
PipelineStudyResult,
1514
)
1615

1716

@@ -26,15 +25,13 @@ def ingest_feature(feature_directory):
2625
db.session.query(Pipeline)
2726
.filter(
2827
Pipeline.name == pipeline_info["name"],
29-
Pipeline.version == pipeline_info["version"],
3028
)
3129
.first()
3230
)
3331
# create a pipeline if it does not exist
3432
if not pipeline:
3533
pipeline = Pipeline(
3634
name=pipeline_info["name"],
37-
version=pipeline_info["version"],
3835
description=pipeline_info.get("description"),
3936
study_dependent=(
4037
True if pipeline_info.get("type", False) == "dependent" else False
@@ -58,29 +55,25 @@ def ingest_feature(feature_directory):
5855
.filter(
5956
PipelineConfig.pipeline_id == pipeline.id,
6057
PipelineConfig.config_hash == config_hash,
58+
PipelineConfig.version == pipeline_info["version"],
6159
)
6260
.first()
6361
)
6462
# create a pipeline config if it does not exist
6563
if not pipeline_config:
6664
pipeline_config = PipelineConfig(
6765
pipeline_id=pipeline.id,
66+
version=pipeline_info["version"],
6867
config=pipeline_info["arguments"],
6968
config_hash=config_hash,
7069
)
7170
db.session.add(pipeline_config)
7271

73-
# create a new pipeline run
74-
pipeline_run = PipelineRun(
75-
pipeline_id=pipeline.id,
76-
config_id=pipeline_config.id,
77-
)
78-
7972
# get a list of all the paper directories in the feature directory
8073
paper_dirs = [d for d in Path(feature_directory).iterdir() if d.is_dir()]
8174

8275
# for each subject directory, read the results.json file and the info.json file
83-
pipeline_run_results = []
76+
pipeline_study_results = []
8477
for paper_dir in paper_dirs:
8578
with open(op.join(paper_dir, "results.json")) as f:
8679
results = json.load(f)
@@ -91,17 +84,16 @@ def ingest_feature(feature_directory):
9184
# use the directory name as the base_study_id
9285
base_study_id = paper_dir.name
9386
# create a new result record
94-
pipeline_run_results.append(
95-
PipelineRunResult(
87+
pipeline_study_results.append(
88+
PipelineStudyResult(
9689
base_study_id=base_study_id,
97-
data=results,
90+
result_data=results,
9891
date_executed=parse_date(info["date"]),
9992
file_inputs=info["inputs"],
100-
run=pipeline_run,
93+
config=pipeline_config,
10194
)
10295
)
10396

104-
db.session.add(pipeline_run)
105-
db.session.add_all(pipeline_run_results)
97+
db.session.add_all(pipeline_study_results)
10698

10799
db.session.commit()

store/neurostore/models/__init__.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
AnalysisConditions,
1515
Pipeline,
1616
PipelineConfig,
17-
PipelineRun,
18-
PipelineRunResult,
19-
PipelineRunResultVote,
17+
PipelineStudyResult,
2018
)
2119
from .auth import User, Role
2220

@@ -38,7 +36,5 @@
3836
"Role",
3937
"Pipeline",
4038
"PipelineConfig",
41-
"PipelineRun",
42-
"PipelineRunResult",
43-
"PipelineRunResultVote",
39+
"PipelineStudyResult",
4440
]

store/neurostore/models/data.py

Lines changed: 91 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
1+
import re
2+
13
import sqlalchemy as sa
24
from sqlalchemy import exists
35
from sqlalchemy.ext.hybrid import hybrid_property
46
from sqlalchemy.dialects.postgresql import JSONB
5-
from sqlalchemy import ForeignKeyConstraint
7+
from sqlalchemy import ForeignKeyConstraint, func
68
from sqlalchemy.ext.associationproxy import association_proxy
7-
89
from sqlalchemy.ext.mutable import MutableDict
9-
from sqlalchemy.orm import relationship, backref
10-
from sqlalchemy.sql import func
10+
from sqlalchemy.orm import relationship, backref, validates, aliased
1111
import shortuuid
1212

1313
from .migration_types import TSVector
1414
from ..database import db
1515

16+
SEMVER_REGEX = r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$" # noqa E501
17+
1618

1719
def _check_type(x):
1820
"""check annotation key type"""
@@ -193,6 +195,9 @@ class BaseStudy(BaseMixin, db.Model):
193195
versions = relationship(
194196
"Study", backref=backref("base_study"), passive_deletes=True
195197
)
198+
pipeline_study_results = relationship(
199+
"PipelineStudyResult", backref=backref("base_study"), passive_deletes=True
200+
)
196201

197202
__table_args__ = (
198203
db.CheckConstraint(level.in_(["group", "meta"])),
@@ -263,6 +268,71 @@ def update_has_images_and_points(self):
263268
self.has_images = self.images_exist
264269
self.has_coordinates = self.points_exist
265270

271+
def display_features(self, pipelines=None):
272+
"""
273+
Display pipeline features for the base study.
274+
Only loads and returns features if pipelines are explicitly specified.
275+
276+
Args:
277+
pipelines (list, optional): List of pipeline names to display features from.
278+
If None or empty, returns empty dict.
279+
"""
280+
if not pipelines:
281+
return {}
282+
283+
# Create aliases for the tables
284+
PipelineAlias = aliased(Pipeline)
285+
PipelineConfigAlias = aliased(PipelineConfig)
286+
PipelineStudyResultAlias = aliased(PipelineStudyResult)
287+
288+
# Get latest results subquery
289+
latest_results = (
290+
db.session.query(
291+
PipelineStudyResultAlias.base_study_id,
292+
PipelineAlias.name.label("pipeline_name"),
293+
func.max(PipelineStudyResultAlias.date_executed).label("max_date"),
294+
)
295+
.join(
296+
PipelineConfigAlias,
297+
PipelineStudyResultAlias.config_id == PipelineConfigAlias.id,
298+
)
299+
.join(PipelineAlias, PipelineConfigAlias.pipeline_id == PipelineAlias.id)
300+
.filter(PipelineStudyResultAlias.base_study_id == self.id)
301+
.filter(PipelineAlias.name.in_(pipelines))
302+
.group_by(PipelineStudyResultAlias.base_study_id, PipelineAlias.name)
303+
.subquery()
304+
)
305+
306+
# Main query joining with latest results
307+
query = (
308+
db.session.query(
309+
PipelineStudyResultAlias.result_data,
310+
PipelineAlias.name.label("pipeline_name"),
311+
)
312+
.join(
313+
PipelineConfigAlias,
314+
PipelineStudyResultAlias.config_id == PipelineConfigAlias.id,
315+
)
316+
.join(PipelineAlias, PipelineConfigAlias.pipeline_id == PipelineAlias.id)
317+
.join(
318+
latest_results,
319+
(
320+
PipelineStudyResultAlias.base_study_id
321+
== latest_results.c.base_study_id
322+
)
323+
& (PipelineAlias.name == latest_results.c.pipeline_name)
324+
& (PipelineStudyResultAlias.date_executed == latest_results.c.max_date),
325+
)
326+
)
327+
328+
# Execute query and build response
329+
results = query.all()
330+
features = {}
331+
for result in results:
332+
features[result.pipeline_name] = result.result_data
333+
334+
return features
335+
266336

267337
class Study(BaseMixin, db.Model):
268338
__tablename__ = "studies"
@@ -544,7 +614,6 @@ class Pipeline(BaseMixin, db.Model):
544614

545615
name = db.Column(db.String)
546616
description = db.Column(db.String)
547-
version = db.Column(db.String)
548617
study_dependent = db.Column(db.Boolean, default=False)
549618
ace_compatible = db.Column(db.Boolean, default=False)
550619
pubget_compatible = db.Column(db.Boolean, default=False)
@@ -557,55 +626,39 @@ class PipelineConfig(BaseMixin, db.Model):
557626
pipeline_id = db.Column(
558627
db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True
559628
)
629+
version = db.Column(db.String)
560630
config = db.Column(JSONB)
631+
executed_at = db.Column(
632+
db.DateTime(timezone=True)
633+
) # when the pipeline was executed on the filesystem (not when it was ingested)
561634
config_hash = db.Column(db.String, index=True)
562635
pipeline = relationship(
563636
"Pipeline", backref=backref("configs", passive_deletes=True)
564637
)
565638

639+
@validates("version")
640+
def validate_version(self, key, value):
641+
if not re.match(SEMVER_REGEX, value):
642+
raise ValueError(f"Invalid version format: {value}")
643+
return value
566644

567-
class PipelineRun(BaseMixin, db.Model):
568-
__tablename__ = "pipeline_runs"
569645

570-
pipeline_id = db.Column(
571-
db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True
572-
)
646+
class PipelineStudyResult(BaseMixin, db.Model):
647+
__tablename__ = "pipeline_study_results"
648+
573649
config_id = db.Column(
574650
db.Text, db.ForeignKey("pipeline_configs.id", ondelete="CASCADE"), index=True
575651
)
576-
config = relationship(
577-
"PipelineConfig", backref=backref("runs", passive_deletes=True)
578-
)
579-
run_index = db.Column(db.Integer())
580-
581-
582-
class PipelineRunResult(BaseMixin, db.Model):
583-
__tablename__ = "pipeline_run_results"
584-
585-
run_id = db.Column(
586-
db.Text, db.ForeignKey("pipeline_runs.id", ondelete="CASCADE"), index=True
587-
)
588652
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
589653
date_executed = db.Column(db.DateTime(timezone=True))
590-
data = db.Column(JSONB)
654+
result_data = db.Column(JSONB)
591655
file_inputs = db.Column(JSONB)
592-
run = relationship("PipelineRun", backref=backref("results", passive_deletes=True))
593-
594-
595-
class PipelineRunResultVote(BaseMixin, db.Model):
596-
__tablename__ = "pipeline_run_result_votes"
597-
598-
run_result_id = db.Column(
599-
db.Text,
600-
db.ForeignKey("pipeline_run_results.id", ondelete="CASCADE"),
601-
index=True,
656+
status = db.Column(
657+
db.Enum("SUCCESS", "FAILURE", "ERROR", "UNKNOWN", name="status_enum")
602658
)
603-
user_id = db.Column(db.Text, db.ForeignKey("users.external_id"), index=True)
604-
accurate = db.Column(db.Boolean)
605-
run_result = relationship(
606-
"PipelineRunResult", backref=backref("votes", passive_deletes=True)
659+
config = relationship(
660+
"PipelineConfig", backref=backref("results", passive_deletes=True)
607661
)
608-
user = relationship("User", backref=backref("votes", passive_deletes=True))
609662

610663

611664
# from . import event_listeners # noqa E402

store/neurostore/openapi

store/neurostore/resources/__init__.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
from .pipeline import (
1919
PipelinesView,
2020
PipelineConfigsView,
21-
PipelineRunsView,
22-
PipelineRunResultsView,
23-
PipelineRunResultVotesView,
21+
PipelineStudyResultsView,
2422
)
2523

2624
__all__ = [
@@ -37,7 +35,5 @@
3735
"UsersView",
3836
"PipelinesView",
3937
"PipelineConfigsView",
40-
"PipelineRunsView",
41-
"PipelineRunResultsView",
42-
"PipelineRunResultVotesView",
38+
"PipelineStudyResultsView",
4339
]

0 commit comments

Comments
 (0)