Skip to content

Commit 1446345

Browse files
committed
add feature table uploader functionality basics
1 parent 709997c commit 1446345

5 files changed

Lines changed: 104 additions & 95 deletions

File tree

mmeds/database/documents.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ class MMEDSDoc(men.Document):
3535
testing = men.BooleanField(required=True)
3636
owner = men.StringField(max_length=100, required=True)
3737
email = men.StringField(max_length=100)
38-
path = men.StringField(max_length=300)
3938
access_code = men.StringField(max_length=50)
4039
doc_type = men.EnumField(DocType, required=True)
4140
is_alive = men.BooleanField()
@@ -166,14 +165,14 @@ class DataDoc(MMEDSDoc):
166165
data_type = men.StringField()
167166
studies = men.ListField(men.ReferenceField(MMEDSDoc))
168167
files = men.MapField(field=men.FileField())
169-
latest_version = men.BooleanField()
170168

171169

172170
class FeatureTableDoc(MMEDSDoc):
173171
"""
174172
MongoDB Document for storing feature tables
175173
"""
176174
table_name = men.StringField()
175+
table_type = men.StringField()
177176
studies = men.ListField(men.ReferenceField(MMEDSDoc))
178177
from_analysis = men.ReferenceField(MMEDSDoc)
179178
table = men.FileField()

mmeds/database/feature_table_uploader.py

Lines changed: 37 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from datetime import datetime
1010
from pathlib import Path
1111
from multiprocessing import Process
12-
from mmeds.error import NoResultError, InvalidUploadError
12+
from mmeds.error import NoResultError
1313
from mmeds.logging import Logger
1414
from mmeds.util import (send_email, create_local_copy)
1515

@@ -18,15 +18,15 @@ class FeatureTableUploader(Process):
1818
"""
1919
This class handles the processing and uploading of observation matrix tables
2020
"""
21-
def __init__(self, access_code, owner, data_name, data_type, data_files, public, testing):
21+
def __init__(self, access_code, owner, studies, table_name, table_type, table_file, public, testing):
2222
warnings.simplefilter('ignore')
2323
super().__init__()
24-
Logger.debug('DataUploader created with params')
24+
Logger.debug('FeatureTableUploader created with params')
2525
Logger.debug({
2626
'owner': owner,
27-
'data_name': data_name,
28-
'data_type': data_type,
29-
'data_files': data_files,
27+
'table_name': table_name,
28+
'table_type': table_type,
29+
'table_file': table_file,
3030
'public': public,
3131
'testing': testing
3232
})
@@ -35,24 +35,8 @@ def __init__(self, access_code, owner, data_name, data_type, data_files, public,
3535
self.access_code = access_code
3636
self.owner = owner
3737
self.testing = testing
38-
self.data_name = data_name
38+
self.table_name = table_name
3939
self.public = public
40-
self.datafiles = data_files
41-
42-
if data_type == "MultiplexedPairedEndSingleBarcodes":
43-
self.reads_type = "paired"
44-
self.barcodes_type = "single"
45-
self.demultiplexed = False
46-
elif data_type == "MultiplexedPairedEndDualBarcodes":
47-
self.reads_type = "paired"
48-
self.barcodes_type = "dual"
49-
self.demultiplexed = False
50-
elif data_type == "DemultiplexedPairedEnd":
51-
self.reads_type = None
52-
self.barcodes_type = None
53-
self.demultiplexed = True
54-
else:
55-
raise InvalidUploadError(f"Got unexpected data type '{data_type}'")
5640

5741
# If testing connect to test server
5842
if testing:
@@ -82,34 +66,24 @@ def __init__(self, access_code, owner, data_name, data_type, data_files, public,
8266
host=sec.MONGO_HOST)
8367

8468
# Create the document
85-
self.data = docs.DataDoc(created=datetime.utcnow(),
86-
last_accessed=datetime.utcnow(),
87-
testing=self.testing,
88-
doc_type=docs.DocType.DATA,
89-
access_code=self.access_code,
90-
data_name=self.data_name,
91-
data_type=self.data_type,
92-
files=[],
93-
owner=self.owner,
94-
public=self.public)
95-
96-
self.data.save()
97-
98-
count = 0
99-
new_dir = fig.SEQUENCING_DIR / ('{}_{}_{}'.format(self.owner, self.data_name, count))
100-
101-
while new_dir.is_dir():
102-
count += 1
103-
new_dir = fig.SEQUENCING_DIR / ('{}_{}_{}'.format(self.owner, self.data_name, count))
104-
new_dir.mkdir()
105-
106-
self.path = Path(new_dir) / 'database_dir'
69+
self.doc = docs.FeatureTableDoc(created=datetime.utcnow(),
70+
last_accessed=datetime.utcnow(),
71+
testing=self.testing,
72+
doc_type=docs.DocType.FEATURE_TABLE,
73+
access_code=self.access_code,
74+
table_name=self.table_name,
75+
table_type=self.table_type,
76+
studies=self.studies,
77+
owner=self.owner,
78+
public=self.public,
79+
latest_version=True)
80+
self.doc.save()
10781

10882
def get_info(self):
10983
""" Method to return a dictionary of relevant info for the process log """
11084
info = {
11185
'created': self.created,
112-
'type': 'upload-run',
86+
'type': 'upload-feature-table',
11387
'owner': self.owner,
11488
'pid': self.pid,
11589
'name': self.name,
@@ -121,9 +95,9 @@ def run(self):
12195
"""
12296
Thread that handles the upload of sequencing run files.
12397
"""
124-
self.data.update(is_alive=True)
125-
self.data.save()
126-
Logger.debug('Handling upload for sequencing run {} for user {}'.format(self.sequencing_run_name, self.owner))
98+
self.doc.update(is_alive=True)
99+
self.doc.save()
100+
Logger.debug('Handling upload for feature table {} for user {}'.format(self.table_name, self.owner))
127101

128102
# If the owner is None set user_id to 0
129103
if self.owner is None:
@@ -142,52 +116,26 @@ def run(self):
142116
self.user_id = int(result[0])
143117
self.email = result[1]
144118

145-
# If the metadata is to be made public overwrite the user_id
146-
if self.public:
147-
self.user_id = 1
148-
self.check_file = fig.DATABASE_DIR / 'last_check.dat'
149-
150119
if not self.path.is_dir():
151120
self.path.mkdir()
152-
self.data.update(path=str(self.path.parent))
153-
self.data.save()
154-
155-
# Create a copy of the Data files
156-
datafile_copies = {key: create_local_copy(Path(filepath).read_bytes(),
157-
f"{Path(filepath).name}", self.path.parent, False)
158-
for key, filepath in self.datafiles.items()
159-
if filepath is not None}
160-
161-
for key, filepath in self.datafiles.items():
162-
filepath = Path(filepath)
163-
with open(filepath, 'rb') as f:
164-
grid_file = men.GridFSProxy()
165-
grid_file.put(f, content_type='text/plain', filename=filepath.name)
166-
self.data.files.append(grid_file)
167-
self.data.save()
121+
self.doc.update(path=str(self.path.parent))
122+
self.doc.save()
168123

169-
# Create sequencing run directory file
170-
with open(self.path.parent / fig.SEQUENCING_DIRECTORY_FILE, "wt") as f:
171-
for key, filepath in self.datafiles.items():
172-
adjusted = key
173-
if key == 'for_reads':
174-
adjusted = 'forward'
175-
elif key == 'rev_reads':
176-
adjusted = 'reverse'
177-
f.write(f"{adjusted}: {Path(filepath).name}\n")
178-
179-
self.mongo_import(**datafile_copies)
124+
self.mongo_import(self.table_file)
180125

181126
# Send the confirmation email
182-
send_email(self.email, self.owner, message='upload-run', run=self.sequencing_run_name,
183-
code=self.access_code, testing=self.testing)
127+
send_email(self.email, self.owner, message='upload-feature_table', study=self.studies[0],
128+
table_name=self.table_name, code=self.access_code, testing=self.testing)
184129
# Update the doc to reflect the successful upload
185-
self.data.update(is_alive=False, exit_code=0)
186-
self.data.save()
130+
self.doc.update(is_alive=False, exit_code=0)
131+
self.doc.save()
187132
return 0
188133

189-
def mongo_import(self, **kwargs):
134+
def mongo_import(self, table_file):
190135
""" Imports additional columns into the NoSQL database. """
191-
self.data.files.update(kwargs)
192-
self.data.update(email=self.email, path=str(self.path.parent))
193-
self.data.save()
136+
self.doc.table = men.GridFSProxy()
137+
with open(table_file, "rb") as f:
138+
self.doc.table.put(f, content_type="text/plain")
139+
140+
self.doc.update(email=self.email, path=str(self.path.parent))
141+
self.doc.save()

mmeds/spawn.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from mmeds.database.data_uploader import DataUploader
1919
from mmeds.database.metadata_adder import MetaDataAdder
2020
from mmeds.database.study_creator import StudyCreator
21+
from mmeds.database.feature_table_uploader import FeatureTableUploader
2122
from mmeds.error import AnalysisError, MissingUploadError
2223

2324
from mmeds.tools.analysis import Analysis
@@ -331,19 +332,26 @@ def handle_upload(self, process):
331332

332333
# Add new sequencing run
333334
elif 'run' in process[0]:
334-
(ptype, sequencing_run_name, username, reads_type, barcodes_type,
335-
datafiles, public) = process
335+
(ptype, sequencing_run_name, username, reads_type, barcodes_type, datafiles, public) = process
336336

337337
p = DataUploader(new_access_code, username, sequencing_run_name, reads_type,
338338
datafiles, public, self.testing)
339+
340+
# Add new feature table
341+
elif 'feature-table' in process[0]:
342+
(ptype, username, study, table_name, table_type, table_file, public) = process
343+
344+
p = FeatureTableUploader(new_access_code, username, [study], table_name, table_type, table_file,
345+
public, self.testing)
346+
339347
# Add new study
340348
else:
341349
Logger.debug(f"length: {len(process)}")
342350
(ptype, study_doc, subject_metadata, specimen_metadata, subject_type,
343351
username, meta_study, temporary, public) = process
344352
# Start a process to handle loading the data
345-
p = MetaDataUploader(new_access_code, subject_metadata, subject_type, specimen_metadata, username, 'qiime',
346-
study_doc, meta_study, temporary, public, self.testing)
353+
p = MetaDataUploader(new_access_code, subject_metadata, subject_type, specimen_metadata, username,
354+
'qiime', study_doc, meta_study, temporary, public, self.testing)
347355
self.db_lock.acquire()
348356
p.start()
349357
self.add_process(ptype, p.access_code)

mmeds/util.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,13 @@ def send_email(toaddr, user, message='upload', testing=False, **kwargs):
804804
'the following access code:\n{code}\n\nBest,\nMmeds Team\n\n' +\
805805
'If you have any issues please email: {cemail} with a description of your problem.\n'
806806
subject = 'New Sequencing Run Uploaded'
807+
elif message == 'upload-feature-table':
808+
body = 'Hello {email}, \nthe user {user} uploaded a feature table "{table_name}" for the {study} study ' +
809+
'to the mmeds database server.\nThis table can now be used for downstream analyses.' +\
810+
'In order to gain access to this data without the password to\n{user} you must provide ' +\
811+
'the following access code:\n{code}\n\nBest,\nMmeds Team\n\n' +\
812+
'If you have any issues please email: {cemail} with a description of your problem.\n'
813+
subject = 'New Feature Table Uploaded'
807814
elif message == 'ids_generated':
808815
body = 'Hello {email},\nthe user {user} uploaded {id_type}s for the study {study}. \n' +\
809816
'The aliquots are added and the IDs have been generated.\n\nBest,\nMmeds Team\n\n' +\
@@ -1487,6 +1494,15 @@ def upload_sequencing_run_local(queue, run_name, user, datafiles, reads_type, ba
14871494
return 0
14881495

14891496

1497+
def upload_feature_table_local(queue, user, study, table_name, table_type, table_file):
1498+
"""
1499+
Directly upload a local feature table using the watcher, bypassing the server
1500+
"""
1501+
queue.put(('upload-feature-table', user, study, table_name, table_type, table_file, False))
1502+
Logger.debug("Feature table sent to queue directly")
1503+
return 0
1504+
1505+
14901506
def process_sequencing_runs_local(runs_path, queue):
14911507
""" Uploads sequencing runs included in a dump file """
14921508
for run in runs_path.glob("*"):

scripts/upload_feature_table.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env python3
2+
3+
import click
4+
import mmeds.util as util
5+
from mmeds.spawn import Watcher
6+
from pathlib import Path
7+
8+
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
9+
10+
11+
@click.command(context_settings=CONTEXT_SETTINGS)
12+
@click.option('-n', '--table-name', required=True, help='Name of table')
13+
@click.option('-t', '--table-type', required=True, help='Type of table', default="taxonomic")
14+
@click.option('-f', '--table-file', required=True, help='Filepath containing table')
15+
@click.option('-s', '--study', required=True, help='Study to be associated with feature table')
16+
@click.option('-u', '--user', required=True, help='User for upload')
17+
def upload_sequencing_run(table_name, table_type, table_file, study, user):
18+
"""
19+
Uploads a sequencing run directly from the command line, bypassing the server
20+
"""
21+
q = get_queue()
22+
23+
result = util.upload_feature_table_local(q, user, study, table_name, table_type, table_file)
24+
assert result == 0
25+
26+
27+
def get_queue():
28+
""" Connects to a watcher queue to be sent to the analysis function in util.py. Those functions
29+
cannot connect to Watcher queue on their own due to their needing a recursive import
30+
"""
31+
watcher = Watcher()
32+
watcher.connect()
33+
queue = watcher.get_queue()
34+
return queue
35+
36+
37+
if __name__ == '__main__':
38+
upload_sequencing_run()

0 commit comments

Comments
 (0)