77from itertools import product
88
99from nipype import Workflow , Node , MapNode
10- from nipype .interfaces .utility import IdentityInterface , Function , Split
10+ from nipype .algorithms .modelgen import SpecifyModel
11+ from nipype .interfaces .utility import IdentityInterface , Function , Split , Merge
1112from nipype .interfaces .io import SelectFiles , DataSink
12- from nipype .interfaces .fsl import (
13- IsotropicSmooth , Level1Design , FEATModel ,
14- L2Model , Merge , FLAMEO , FILMGLS , MultipleRegressDesign ,
13+ from nipype .interfaces .fsl .maths import ImageMaths , ImageStats , MultiImageMaths
14+ from nipype .interfaces .fsl .preprocess import SUSAN
15+ from nipype .interfaces .fsl .model import (
16+ Level1Design , FEATModel , L2Model , FLAMEO , FILMGLS , MultipleRegressDesign ,
1517 FSLCommand , Cluster
1618 )
17- from nipype .algorithms .modelgen import SpecifyModel
18- from nipype .interfaces .fsl .maths import MultiImageMaths
19+ from nipype .interfaces .fsl .utils import Merge as FSLMerge
1920
2021from narps_open .utils .configuration import Configuration
2122from narps_open .pipelines import Pipeline
@@ -41,8 +42,150 @@ def __init__(self):
4142 ]
4243
4344 def get_preprocessing (self ):
44- """ No preprocessing has been done by team O21U """
45- return None
45+ """ Create the preprocessing workflow """
46+
47+ # Initiate the workflow
48+ preprocessing = Workflow (
49+ base_dir = self .directories .working_dir ,
50+ name = 'preprocessing'
51+ )
52+
53+ # IdentityInterface Node - Iterate on subject and runs
54+ information_source = Node (IdentityInterface (
55+ fields = ['subject_id' , 'run_id' ]),
56+ name = 'information_source' )
57+ information_source .iterables = [
58+ ('subject_id' , self .subject_list ),
59+ ('run_id' , self .run_list )
60+ ]
61+
62+ # SelectFiles - Get necessary files
63+ templates = {
64+ 'func' : join ('derivatives' , 'fmriprep' , 'sub-{subject_id}' , 'func' ,
65+ 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_preproc.nii.gz' ),
66+ 'mask' : join ('derivatives' , 'fmriprep' , 'sub-{subject_id}' , 'func' ,
67+ 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_brainmask.nii.gz' )
68+ }
69+ select_files = Node (SelectFiles (templates ), name = 'select_files' )
70+ select_files .inputs .base_directory = self .directories .dataset_dir
71+ preprocessing .connect (information_source , 'subject_id' , select_files , 'subject_id' )
72+ preprocessing .connect (information_source , 'run_id' , select_files , 'run_id' )
73+
74+ # ImageMaths - Convert func to float representation
75+ func_to_float = Node (ImageMaths (), name = 'func_to_float' )
76+ func_to_float .inputs .out_data_type = 'float'
77+ func_to_float .inputs .op_string = ''
78+ func_to_float .inputs .suffix = '_dtype'
79+ preprocessing .connect (select_files , 'func' , func_to_float , 'in_file' )
80+
81+ # ImageMaths - Mask the functional image
82+ mask_func = Node (ImageMaths (), name = 'mask_func' )
83+ mask_func .inputs .suffix = '_thresh'
84+ mask_func .inputs .op_string = '-mas'
85+ preprocessing .connect (func_to_float , 'out_file' , mask_func , 'in_file' )
86+ preprocessing .connect (select_files , 'mask' , mask_func , 'in_file2' )
87+
88+ # ImageMaths - Compute the mean image of each time point
89+ mean_func = Node (ImageMaths (), name = 'mean_func' )
90+ mean_func .inputs .suffix = '_mean'
91+ mean_func .inputs .op_string = '-Tmean'
92+ preprocessing .connect (mask_func , 'out_file' , mean_func , 'in_file' )
93+
94+ # ImageStats - Compute the median value of each time point
95+ # (only because it's needed by SUSAN)
96+ median_value = Node (ImageStats (), name = 'median_value' )
97+ median_value .op_string = '-k %s -p 50'
98+ preprocessing .connect (select_files , 'func' , median_value , 'in_file' )
99+ preprocessing .connect (select_files , 'mask' , median_value , 'mask_file' )
100+
101+ # Merge - Merge the median values with the mean functional images into a coupled list
102+ merge_median = Node (Merge (2 , axis = 'hstack' ), name = 'merge_median' )
103+ preprocessing .connect (mean_func , 'out_file' , merge_median , 'in1' )
104+ preprocessing .connect (median_value , 'out_stat' , merge_median , 'in2' )
105+
106+ # SUSAN - Smoothing funk
107+ smooth_func = Node (SUSAN (), name = 'smooth_func' )
108+ smooth_func .inputs .fwhm = 4.9996179300001655 # see Chen et. al. 2022
109+
110+ # Define a function to get the brightness threshold for SUSAN
111+ get_brightness_threshold = lambda median : 0.75 * median
112+
113+ # Define a function to get the usans for SUSAN
114+ get_usans = lambda value : [tuple ([val [0 ], 0.75 * val [1 ]])]
115+
116+ preprocessing .connect (mask_func , 'out_file' , smooth_func , 'in_file' )
117+ preprocessing .connect (
118+ median_value , ('out_stat' , get_brightness_threshold ),
119+ smooth_func , 'brightness_threshold' )
120+ preprocessing .connect (merge_median , ('out' , getusans ), smooth_func , 'usans' )
121+
122+ # TODO : Mask the smoothed data ?
123+ """
124+ maskfunc3 = pe.MapNode(
125+ interface=fsl.ImageMaths(op_string='-mas'),
126+ name='maskfunc3',
127+ iterfield=['in_file','in_file2'])
128+ wf.connect(smooth, 'smoothed_file', maskfunc3, 'in_file')
129+ wf.connect(dilatemask, 'out_file', maskfunc3, 'in_file2')
130+ """
131+
132+ # Define a function to get the scaling factor for intensity normalization
133+ get_intensity_normalization_scale = lambda median : '-mul %.10f' % (10000. / median )
134+
135+ # ImageMaths - Scale each time point so that its median value is 10000
136+ normalize_intensity = Node (ImageMaths (), name = 'normalize_intensity' )
137+ normalize_intensity .inputs .suffix = '_intnorm'
138+ preprocessing .connect (smooth_func , 'out_file' , normalize_intensity , 'in_file' )
139+ preprocessing .connect (
140+ median_value , ('out_stat' , get_intensity_normalization_scale ),
141+ normalize_intensity , 'op_string' )
142+
143+ # TODO : temporal filtering ???
144+
145+ # DataSink Node - store the wanted results in the wanted repository
146+ data_sink = Node (DataSink (), name = 'data_sink' )
147+ data_sink .inputs .base_directory = self .directories .output_dir
148+ preprocessing .connect (
149+ normalize_intensity , 'out_file' , data_sink , 'preprocessing.@normalized_file' )
150+
151+ # Remove large files, if requested
152+ if Configuration ()['pipelines' ]['remove_unused_data' ]:
153+
154+ # Merge Node - Merge func file names to be removed after datasink node is performed
155+ merge_removable_files = Node (Merge (4 ), name = 'merge_removable_files' )
156+ merge_removable_files .inputs .ravel_inputs = True
157+ preprocessing .connect (func_to_float , 'out_file' , merge_removable_files , 'in1' )
158+ preprocessing .connect (mask_func , 'out_file' , merge_removable_files , 'in2' )
159+ preprocessing .connect (mean_func , 'out_file' , merge_removable_files , 'in3' )
160+ preprocessing .connect (smooth_func , 'out_file' , merge_removable_files , 'in4' )
161+
162+ # Function Nodes remove_files - Remove sizeable func files once they aren't needed
163+ remove_dirs = MapNode (Function (
164+ function = remove_parent_directory ,
165+ input_names = ['_' , 'file_name' ],
166+ output_names = []
167+ ), name = 'remove_dirs' , iterfield = 'file_name' )
168+ preprocessing .connect (merge_removable_files , 'out' , remove_dirs , 'file_name' )
169+ preprocessing .connect (data_sink , 'out_file' , remove_dirs , '_' )
170+
171+ return preprocessing
172+
173+ def get_preprocessing_outputs (self ):
174+ """ Return the names of the files the preprocessing is supposed to generate. """
175+
176+ # Outputs from preprocessing (intensity normalized files)
177+ parameters = {
178+ 'subject_id' : self .subject_list ,
179+ 'run_id' : self .run_list ,
180+ }
181+ parameter_sets = product (* parameters .values ())
182+ template = join (
183+ self .directories .output_dir , 'preprocessing' ,
184+ '_subject_id_{subject_id}' , '_run_id_{run_id}' ,
185+ 'wc2sub-{subject_id}_T1w.nii' )
186+
187+ return [template .format (** dict (zip (parameters .keys (), parameter_values )))\
188+ for parameter_values in parameter_sets ]
46189
47190 def get_subject_information (event_file ):
48191 """
@@ -121,7 +264,7 @@ def get_run_level_analysis(self):
121264 Returns:
122265 - run_level : nipype.WorkFlow
123266 """
124- # Create run level analysis workflow and connect its nodes
267+ # Create run level analysis workflow
125268 run_level = Workflow (
126269 base_dir = self .directories .working_dir ,
127270 name = 'run_level_analysis'
0 commit comments