-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathautomatic_pipeline_ltp.py
More file actions
executable file
·88 lines (76 loc) · 3.63 KB
/
automatic_pipeline_ltp.py
File metadata and controls
executable file
·88 lines (76 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/global/shared/runvenv workshop
from __future__ import print_function
import json
import sys
import os
import random
from event_creation.submission.tasks import IndexAggregatorTask
# from log import logger
script_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(script_dir)
##########
#
# Event creation management
#
##########
def automatic_event_creator(check_index=True):
# Load list of supported active experiments
with open('/data/eeg/scalp/ltp/ACTIVE_EXPERIMENTS.txt', 'r') as f:
experiments = [s.strip() for s in f.readlines()]
inputs = []
for exp in experiments:
exp = exp.strip()
if not exp:
continue
try:
# Get dictionary of new/recently modified sessions
with open('/data/eeg/scalp/ltp/%s/recently_modified.json' % exp, 'r') as f:
new_sess = json.load(f)
# Get index for ltp database to identify which sessions have been proccessed
with open('/protocols/ltp.json', 'r') as f:
db_index = json.load(f)
if db_index != {}:
db_index = db_index['protocols']['ltp']['subjects']
print(f'Loaded index for LTP database with {len(db_index)} subjects')
# print(f"db_index keys: {db_index.keys()} db_keys values: {db_index.values()}")
except IOError:
print('Unable to load necessary session information for experiment: ', exp)
print('Skipping...')
continue
# Create an input structure for each new session that has not been processed
print(f'{"Checking index for" if check_index else "Processing"} {exp} sessions: {new_sess}')
for subject in new_sess:
for session in new_sess[subject]:
# Only process sessions whose annotations have been corrected
session_dir = f'/data/eeg/scalp/ltp/{exp}/{subject}/session_{session}'
if os.path.exists(os.path.join(session_dir, 'AUTOMATED_ANNOT')):
print(f'Skipping {exp} {subject} session {session}: uncorrected automatic annotations')
continue
if os.path.exists(os.path.join(session_dir, 'CORRECTED_ANNOT')):
print(f'Running {exp} {subject} session {session}: corrected automatic annotations')
if check_index:
if (db_index == {}) or (subject not in db_index) or (exp not in db_index[subject]['experiments']) or (str(session) not in db_index[subject]['experiments'][exp]['sessions']):
inputs.append(f'{exp}:{subject}:{session}')
print('Session to run: ', exp, subject, session)
else:
inputs.append(f'{exp}:{subject}:{session}')
print('Session to run: ', exp, subject, session)
# Submit a job to the cluster for each session that needs to be processed
n_jobs = len(inputs)
if n_jobs > 0:
# Minimize consistent problems by shuffling the list each time
random.shuffle(inputs)
outdir = os.path.join(os.environ['HOME'], 'logs', 'stdouterr')
# Use sbatch to launch automatic_run.py,
# which in turn will call ./submit
os.system(f'sbatch --mem-per-cpu=60G -t 23:00:00 ' +
f'-o {outdir}/slurm-%A_%a.out -e {outdir}/slurm-%A_%a.err ' +
f'-a 0-{n_jobs-1}%16 ' +
f'{script_dir}/automatic_run.py {script_dir} ' +
' '.join(inputs))
if __name__=='__main__':
try:
automatic_event_creator(check_index=True)
except:
pass
IndexAggregatorTask().run(protocols='ltp')