@@ -130,6 +130,55 @@ def load_mp_analysis(db: Session, obj: Dict[str, Any], **kwargs) -> LoadObjectRe
130
130
return pipeline
131
131
132
132
133
+ def load_mt_annotation (db : Session , obj : Dict [str , Any ], ** kwargs ) -> LoadObjectReturn :
134
+ # Ingest the MetatranscriptomeAnnotation record
135
+ pipeline = cast (models .MetatranscriptomeAnnotation , load_mt_annotation_base (db , obj , ** kwargs ))
136
+
137
+ annotations : Collection = kwargs ["annotations" ]
138
+
139
+ # Query gene function annotations from mongo and build the appropriate objects
140
+ query = annotations .find (
141
+ {
142
+ "metagenome_annotation_id" : pipeline .id ,
143
+ "gene_function_id" : {
144
+ "$regex" : ko_regex ,
145
+ },
146
+ },
147
+ no_cursor_timeout = True ,
148
+ projection = {
149
+ "_id" : False ,
150
+ "metatranscriptome_annotation_id" : True ,
151
+ "count" : True ,
152
+ "gene_function_id" : True ,
153
+ },
154
+ )
155
+ if kwargs .get ("function_limit" ):
156
+ query = query .limit (kwargs ["function_limit" ])
157
+
158
+ gene_functions : Set [str ] = set ()
159
+ gene_function_aggregations : List [models .MetaTGeneFunctionAggregation ] = []
160
+ for annotation in query :
161
+ function_id = annotation ["gene_function_id" ]
162
+ gene_functions .add (function_id )
163
+ gene_function_aggregations .append (
164
+ models .MetaTGeneFunctionAggregation (
165
+ metatranscriptome_annotation_id = pipeline .id ,
166
+ gene_function_id = function_id ,
167
+ count = annotation ["count" ],
168
+ )
169
+ )
170
+ # Save both newly encountered gene functions and the gene function aggregations
171
+ if gene_function_aggregations :
172
+ db .execute (
173
+ insert (models .GeneFunction )
174
+ .on_conflict_do_nothing ()
175
+ .values ([(gf ,) for gf in gene_functions ])
176
+ )
177
+ db .bulk_save_objects (gene_function_aggregations )
178
+
179
+ return pipeline
180
+
181
+
133
182
# This is a loader for a generic workflow type that doesn't need any
134
183
# additional processing.
135
184
def generate_pipeline_loader (schema , model ) -> LoadObject :
@@ -161,6 +210,12 @@ def loader(db: Session, obj: Dict[str, Any], **kwargs: Any) -> LoadObjectReturn:
161
210
load_metatranscriptome = generate_pipeline_loader (
162
211
schemas .MetatranscriptomeBase , models .Metatranscriptome
163
212
)
213
+ load_mt_assembly = generate_pipeline_loader (
214
+ schemas .MetatranscriptomeAssemblyBase , models .MetatranscriptomeAssembly
215
+ )
216
+ load_mt_annotation_base = generate_pipeline_loader (
217
+ schemas .MetatranscriptomeAnnotationBase , models .MetatranscriptomeAnnotation
218
+ )
164
219
165
220
166
221
# This is a generic function for load workflow execution objects. Some workflow types require
0 commit comments