Skip to content

Commit 340d225

Browse files
authored
Ref/speed up ingestion (#616)
* preload more attributes * wip: speed up PUT * add sqltap profiling agsi * do not update has_coordinates or has_images if irrelevant attribute updated * make openapi more permissive and style * remove unused import * be more selective when updating has_coordinates and has_images * refactor how records are looked up * preload analyses * handle loading of annotations * preload the correct attributes for annotations * catch more custom annotation loading * fix annotation loading attempt #1 * attempt #2 * attempt #3 * reassign q * remove extraneous command, and load studyset * style fixed * comment out unused bits
1 parent 77d3ac4 commit 340d225

File tree

11 files changed

+361
-65
lines changed

11 files changed

+361
-65
lines changed

store/neurostore/core.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import os
22
from pathlib import Path
3-
from werkzeug.middleware.profiler import ProfilerMiddleware
43

54
from connexion.middleware import MiddlewarePosition
65
from starlette.middleware.cors import CORSMiddleware
@@ -10,11 +9,45 @@
109
# from connexion.json_schema import default_handlers as json_schema_handlers
1110
from connexion.resolver import MethodResolver
1211
from flask_caching import Cache
13-
import sqltap.wsgi
1412

1513
from .or_json import ORJSONDecoder, ORJSONEncoder
1614
from .database import init_db
1715

16+
# from datetime import datetime
17+
18+
# import sqltap.wsgi
19+
# import sqltap
20+
# import yappi
21+
22+
# class SQLTapMiddleware:
23+
# def __init__(self, app):
24+
# self.app = app
25+
26+
# async def __call__(self, scope, receive, send):
27+
# profiler = sqltap.start()
28+
# await self.app(scope, receive, send)
29+
# statistics = profiler.collect()
30+
# sqltap.report(statistics, "report.txt", report_format="text")
31+
32+
33+
# class LineProfilerMiddleware:
34+
# def __init__(self, app):
35+
# self.app = app
36+
37+
# async def __call__(self, scope, receive, send):
38+
# yappi.start()
39+
# await self.app(scope, receive, send)
40+
# yappi.stop()
41+
# filename = (
42+
# scope["path"].lstrip("/").rstrip("/").replace("/", "-")
43+
# + "-"
44+
# + scope["method"].lower()
45+
# + str(datetime.now())
46+
# + ".prof"
47+
# )
48+
# stats = yappi.get_func_stats()
49+
# stats.save(filename, type="pstat")
50+
1851

1952
connexion_app = connexion.FlaskApp(__name__, specification_dir="openapi/")
2053

@@ -45,6 +78,16 @@
4578
allow_headers=["*"],
4679
)
4780

81+
# add sqltap
82+
# connexion_app.add_middleware(
83+
# SQLTapMiddleware,
84+
# )
85+
86+
# add profiling
87+
# connexion_app.add_middleware(
88+
# LineProfilerMiddleware
89+
# )
90+
4891
connexion_app.add_api(
4992
openapi_file,
5093
base_path="/api",
@@ -68,9 +111,5 @@
68111
},
69112
)
70113

71-
if app.debug:
72-
app.wsgi_app = sqltap.wsgi.SQLTapMiddleware(app.wsgi_app, path="/api/__sqltap__")
73-
app = ProfilerMiddleware(app)
74-
75114
app.json_encoder = ORJSONEncoder
76115
app.json_decoder = ORJSONDecoder

store/neurostore/models/data.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ class BaseStudy(BaseMixin, db.Model):
149149

150150
user = relationship("User", backref=backref("base_studies"))
151151
# retrieve versions of same study
152-
versions = relationship(
153-
"Study", backref=backref("base_study"))
152+
versions = relationship("Study", backref=backref("base_study"))
154153

155154
def update_has_images_and_points(self):
156155
# Calculate has_images and has_coordinates for the BaseStudy

store/neurostore/models/event_listeners.py

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
from sqlalchemy.exc import SQLAlchemyError
2+
from sqlalchemy import inspect
3+
from sqlalchemy.orm import joinedload
24
from flask_sqlalchemy.session import Session
35
from sqlalchemy import event
46
from .data import (
57
AnnotationAnalysis,
68
Annotation,
79
Studyset,
10+
StudysetStudy,
811
BaseStudy,
912
Study,
1013
Analysis,
1114
Point,
1215
Image,
1316
_check_type,
1417
)
18+
1519
from ..database import db
1620

1721

@@ -64,6 +68,27 @@ def create_blank_notes(studyset, annotation, initiator):
6468

6569

6670
def add_annotation_analyses_studyset(studyset, studies, collection_adapter):
71+
if not (inspect(studyset).pending or inspect(studyset).transient):
72+
studyset = (
73+
Studyset.query.filter_by(id=studyset.id)
74+
.options(
75+
joinedload(Studyset.studies).options(joinedload(Study.analyses)),
76+
joinedload(Studyset.annotations),
77+
)
78+
.one()
79+
)
80+
all_studies = set(studyset.studies + studies)
81+
existing_studies = [
82+
s for s in all_studies if not (inspect(s).pending or inspect(s).transient)
83+
]
84+
study_query = (
85+
Study.query.filter(Study.id.in_([s.id for s in existing_studies]))
86+
.options(joinedload(Study.analyses))
87+
.all()
88+
)
89+
90+
all_studies.union(set(study_query))
91+
6792
all_analyses = [analysis for study in studies for analysis in study.analyses]
6893
existing_analyses = [
6994
analysis for study in studyset.studies for analysis in study.analyses
@@ -91,6 +116,17 @@ def add_annotation_analyses_studyset(studyset, studies, collection_adapter):
91116

92117

93118
def add_annotation_analyses_study(study, analyses, collection_adapter):
119+
if not (inspect(study).pending or inspect(study).transient):
120+
study = (
121+
Study.query.filter_by(id=study.id)
122+
.options(
123+
joinedload(Study.analyses),
124+
joinedload(Study.studyset_studies)
125+
.joinedload(StudysetStudy.studyset)
126+
.joinedload(Studyset.annotations),
127+
)
128+
.one()
129+
)
94130
new_analyses = set(analyses) - set([a for a in study.analyses])
95131

96132
all_annotations = set(
@@ -150,14 +186,31 @@ def get_nested_attr(obj, nested_attr):
150186

151187
def get_base_study(obj):
152188
base_study = None
189+
153190
if isinstance(obj, (Point, Image)):
154-
base_study = get_nested_attr(obj, "analysis.study.base_study")
155-
if isinstance(obj, Analysis):
156-
base_study = get_nested_attr(obj, "study.base_study")
157-
if isinstance(obj, Study):
158-
base_study = obj.base_study
159-
if isinstance(obj, BaseStudy):
160-
base_study = obj
191+
if obj in session.new or session.deleted:
192+
base_study = get_nested_attr(obj, "analysis.study.base_study")
193+
elif isinstance(obj, Analysis):
194+
relevant_attrs = ("study", "points", "images")
195+
for attr in relevant_attrs:
196+
attr_history = get_nested_attr(inspect(obj), f"attrs.{attr}.history")
197+
if attr_history.added or attr_history.deleted:
198+
base_study = get_nested_attr(obj, "study.base_study")
199+
break
200+
elif isinstance(obj, Study):
201+
relevant_attrs = ("base_study", "analyses")
202+
for attr in relevant_attrs:
203+
attr_history = get_nested_attr(inspect(obj), f"attrs.{attr}.history")
204+
if attr_history.added or attr_history.deleted:
205+
base_study = obj.base_study
206+
break
207+
elif isinstance(obj, BaseStudy):
208+
relevant_attrs = ("versions",)
209+
for attr in relevant_attrs:
210+
attr_history = get_nested_attr(inspect(obj), f"attrs.{attr}.history")
211+
if attr_history.added or attr_history.deleted:
212+
base_study = obj
213+
break
161214

162215
return base_study
163216

@@ -169,4 +222,18 @@ def get_base_study(obj):
169222

170223
# Update the has_images and has_points for each unique BaseStudy
171224
for base_study in unique_base_studies:
225+
if (
226+
inspect(base_study).attrs.versions.history.added
227+
and base_study.has_coordinates is True
228+
and base_study.has_images is True
229+
):
230+
continue
231+
232+
if (
233+
inspect(base_study).attrs.versions.history.deleted
234+
and base_study.has_coordinates is False
235+
and base_study.has_images is False
236+
):
237+
continue
238+
172239
base_study.update_has_images_and_points()

store/neurostore/resources/base.py

Lines changed: 87 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,18 @@ def post_nested_record_update(record):
7272
"""
7373
return record
7474

75+
def after_update_or_create(self, record):
76+
"""
77+
Processing of a record after updating or creating (defined in specific classes).
78+
"""
79+
return record
80+
7581
@classmethod
76-
def update_or_create(cls, data, id=None, commit=True):
82+
def load_nested_records(cls, data, record=None):
83+
return data
84+
85+
@classmethod
86+
def update_or_create(cls, data, id=None, user=None, record=None, commit=True):
7787
"""
7888
scenerios:
7989
1. cloning a study
@@ -91,7 +101,7 @@ def update_or_create(cls, data, id=None, commit=True):
91101
# Store all models so we can atomically update in one commit
92102
to_commit = []
93103

94-
current_user = get_current_user()
104+
current_user = user or get_current_user()
95105
if not current_user:
96106
current_user = create_user()
97107

@@ -104,31 +114,35 @@ def update_or_create(cls, data, id=None, commit=True):
104114

105115
# allow compose bot to make changes
106116
compose_bot = current_app.config["COMPOSE_AUTH0_CLIENT_ID"] + "@clients"
107-
if id is None:
117+
if id is None and record is None:
108118
record = cls._model()
109119
record.user = current_user
110-
else:
120+
elif record is None:
111121
record = cls._model.query.filter_by(id=id).first()
112122
if record is None:
113123
abort(422)
114-
elif (
115-
record.user_id != current_user.external_id
116-
and not only_ids
117-
and current_user.external_id != compose_bot
118-
):
119-
abort(403)
120-
elif only_ids:
121-
to_commit.append(record)
122-
123-
if commit:
124-
db.session.add_all(to_commit)
125-
try:
126-
db.session.commit()
127-
except SQLAlchemyError:
128-
db.session.rollback()
129-
abort(400)
130-
131-
return record
124+
125+
data = cls.load_nested_records(data, record)
126+
127+
if (
128+
not sa.inspect(record).pending
129+
and record.user != current_user
130+
and not only_ids
131+
and current_user.external_id != compose_bot
132+
):
133+
abort(403)
134+
elif only_ids:
135+
to_commit.append(record)
136+
137+
if commit:
138+
db.session.add_all(to_commit)
139+
try:
140+
db.session.commit()
141+
except SQLAlchemyError:
142+
db.session.rollback()
143+
abort(400)
144+
145+
return record
132146

133147
# Update all non-nested attributes
134148
for k, v in data.items():
@@ -151,7 +165,13 @@ def update_or_create(cls, data, id=None, commit=True):
151165
}
152166
else:
153167
query_args = {"id": v["id"]}
154-
v = LnCls._model.query.filter_by(**query_args).first()
168+
169+
if v.get("preloaded_data"):
170+
v = v["preloaded_data"]
171+
else:
172+
q = LnCls._model.query.filter_by(**query_args)
173+
v = q.first()
174+
155175
if v is None:
156176
abort(400)
157177

@@ -171,13 +191,40 @@ def update_or_create(cls, data, id=None, commit=True):
171191
ResCls = getattr(viewdata, res_name)
172192
if data.get(field) is not None:
173193
if isinstance(data.get(field), list):
174-
nested = [
175-
ResCls.update_or_create(rec, commit=False)
176-
for rec in data.get(field)
177-
]
194+
nested = []
195+
for rec in data.get(field):
196+
id = None
197+
if isinstance(rec, dict) and rec.get("id"):
198+
id = rec.get("id")
199+
elif isinstance(rec, str):
200+
id = rec
201+
if data.get("preloaded_studies") and id:
202+
nested_record = data["preloaded_studies"].get(id)
203+
else:
204+
nested_record = None
205+
nested.append(
206+
ResCls.update_or_create(
207+
rec,
208+
user=current_user,
209+
record=nested_record,
210+
commit=False,
211+
)
212+
)
178213
to_commit.extend(nested)
179214
else:
180-
nested = ResCls.update_or_create(data.get(field), commit=False)
215+
id = None
216+
rec = data.get(field)
217+
if isinstance(rec, dict) and rec.get("id"):
218+
id = rec.get("id")
219+
elif isinstance(rec, str):
220+
id = rec
221+
if data.get("preloaded_studies") and id:
222+
nested_record = data["preloaded_studies"].get(id)
223+
else:
224+
nested_record = None
225+
nested = ResCls.update_or_create(
226+
rec, user=current_user, record=nested_record, commit=False
227+
)
181228
to_commit.append(nested)
182229

183230
setattr(record, field, nested)
@@ -298,7 +345,15 @@ def get(self, id):
298345
q = self._model.query
299346
if args["nested"] or self._model is Annotation:
300347
q = q.options(nested_load(self))
301-
348+
if self._model is Annotation:
349+
q = q.options(
350+
joinedload(Annotation.annotation_analyses).options(
351+
joinedload(AnnotationAnalysis.analysis),
352+
joinedload(AnnotationAnalysis.studyset_study).options(
353+
joinedload(StudysetStudy.study)
354+
),
355+
)
356+
)
302357
record = q.filter_by(id=id).first_or_404()
303358
if self._model is Studyset and args["nested"]:
304359
snapshot = StudysetSnapshot()
@@ -319,6 +374,7 @@ def put(self, id):
319374
with db.session.no_autoflush:
320375
record = self.__class__.update_or_create(data, id)
321376

377+
record = self.after_update_or_create(record)
322378
# clear relevant caches
323379
clear_cache(self.__class__, record, request.path)
324380

@@ -481,6 +537,8 @@ def post(self):
481537
with db.session.no_autoflush:
482538
record = self.__class__.update_or_create(data)
483539

540+
record = self.after_update_or_create(record)
541+
484542
# clear the cache for this endpoint
485543
clear_cache(self.__class__, record, request.path)
486544

0 commit comments

Comments
 (0)