1+ import os
2+ import sys
3+ from glob import glob
4+ import numpy as np
5+ import argparse
6+ import tempfile
7+ try :
8+ current_dir = os .path .dirname (os .path .abspath (__file__ ))
9+ parent_dir = os .path .dirname (current_dir )
10+ sys .path .append (parent_dir )
11+ from src .file_specs import FileSpecifics
12+ import src .ImagePreprocessFilters as IPrep
13+ import src .ImageParser as IP
14+ except ImportError :
15+ print ("Error: Could not import modules from the 'src' directory." )
16+ print ("Please ensure your directory structure is correct and that the 'src' directory exists." )
17+ sys .exit (1 )
18+
19+
20+ def preprocess_image (img , thresholds , percentiles ):
21+ filtered_img = np .empty (img .shape )
22+ for ch in range (img .shape [2 ]):
23+ img_ch = img [:, :, ch ]
24+
25+ # Thresholding
26+ th = thresholds [ch ]
27+ if th is not None :
28+ img_ch = np .where (img_ch >= th , img_ch , 0 )
29+
30+ # Percentile filtering
31+ perc = percentiles [ch ]
32+ if perc is not None :
33+ img_ch = img_ch [..., np .newaxis ]
34+ img_ch = IPrep .percentile_filter (img_ch , window_size = 3 , percentile = perc , transf_bool = True )
35+ img_ch = img_ch .squeeze ()
36+
37+ filtered_img [:, :, ch ] = img_ch
38+ return filtered_img
39+
40+
41+ folder_path = 'data_test/all_ch/METABRIC22_sample/'
42+ # folder_path = 'data_test/all_ch/stacks_with_names/'
43+ path_for_results = 'data_test/results_percentile/'
44+
45+ def run_test_case_metabric (data_path , results_path ):
46+ # normalization outliers
47+ up_limit = 99
48+ down_limit = 1
49+ binary_masks = False
50+
51+ # Load files
52+ files = glob (os .path .join (data_path , '*.tiff' ))
53+ num_images = len (files )
54+ assert num_images > 0 , "No images found in the specified folder."
55+ assert num_images == 2 , "Expected exactly 2 images for testing."
56+
57+ # Parse image channels
58+ specs = FileSpecifics (files [0 ], multitiff = True )
59+ channel_names = specs .channel_names
60+ assert specs .channel_names , "Channel names should not be empty."
61+ num_channels = len (channel_names )
62+ assert num_channels > 0 , "No channels found in the specified image."
63+
64+ # Calculate thresholds and percentiles
65+ thresholds = [0.1 for _ in range (num_channels )]
66+ percentiles = [0.5 for _ in range (num_channels )]
67+
68+ images_original = list (map (IP .parse_image_pages , files ))
69+ assert len (images_original ) == num_images , "Mismatch in number of images parsed."
70+ assert all (img .shape [2 ] == num_channels for img in images_original ), "All images must have the same number of channels."
71+
72+ # Preprocessing
73+ imgs_out = map (lambda p : IPrep .remove_outliers (p , up_limit , down_limit ), images_original )
74+ imgs_norm = map (IPrep .normalize_channel_cv2_minmax , imgs_out )
75+ filtered_images = map (lambda i : preprocess_image (i , thresholds , percentiles ), imgs_norm )
76+ imgs_filtered = list (filtered_images )
77+
78+ assert len (imgs_filtered ) == num_images , "Mismatch in number of filtered images."
79+ assert all (img .shape [2 ] == num_channels for img in imgs_filtered ), "All images must have the same number of channels."
80+
81+ # Save images
82+ names_save = [os .path .join (results_path , os .path .basename (sub )) for sub in files ]
83+ images_final = map (lambda p , f : IPrep .save_images (p , f , ch_last = True ), imgs_filtered , names_save )
84+
85+ assert len (list (images_final )) == num_images , "Mismatch in number of images saved."
86+ assert all (os .path .exists (name ) for name in names_save ), "Not all images were saved successfully."
87+
88+
89+
90+ # # Apply binary masks if needed
91+ # if binary_masks:
92+ # imgs_filtered = [np.where(a > 0, 1, 0) for a in imgs_filtered]
93+
94+ def run_test_case_with_channel_names (data_path , results_path ):
95+ # normalization outliers
96+ up_limit = 99
97+ down_limit = 1
98+ binary_masks = False
99+
100+ # Load files
101+ files = glob (os .path .join (data_path , '*.tiff' ))
102+ num_images = len (files )
103+ assert num_images > 0 , "No images found in the specified folder."
104+
105+ # Parse image channels
106+ specs = FileSpecifics (files [0 ], multitiff = True )
107+ assert specs .channel_names , "Channel names should not be empty."
108+ channel_names = specs .channel_names
109+ num_channels = len (channel_names )
110+ assert num_channels > 0 , "No channels found in the specified image."
111+ assert all (isinstance (name , str ) for name in channel_names ), "Channel names should be strings."
112+
113+ # Calculate thresholds and percentiles
114+ thresholds = [0.1 for _ in range (num_channels )]
115+ percentiles = [0.5 for _ in range (num_channels )]
116+
117+ images_original = list (map (IP .parse_image_pages , files ))
118+ assert len (images_original ) == num_images , "Mismatch in number of images parsed."
119+
120+ # Preprocessing
121+ imgs_out = map (lambda p : IPrep .remove_outliers (p , up_limit , down_limit ), images_original )
122+ imgs_norm = map (IPrep .normalize_channel_cv2_minmax , imgs_out )
123+ filtered_images = map (lambda i : preprocess_image (i , thresholds , percentiles ), imgs_norm )
124+ imgs_filtered = list (filtered_images )
125+
126+ assert len (imgs_filtered ) == num_images , "Mismatch in number of filtered images."
127+ assert all (img .shape [2 ] == num_channels for img in imgs_filtered ), "All images must have the same number of channels."
128+
129+ # Save images
130+ # TODO FIX THIS
131+ # names_save = [os.path.join(results_path, os.path.basename(sub)) for sub in files]
132+ # names_save = [os.path.join(results_path, os.path.basename(sub).replace('.ome.tiff', '.tiff')) for sub in files]
133+ #
134+ # print(names_save)
135+ # images_final = map(
136+ # lambda p, f: IPrep.save_img_ch_names_pages(p, f, ch_last=True, channel_names=channel_names),
137+ # imgs_filtered, names_save)
138+ # assert all(os.path.exists(name) for name in names_save), "Not all images with channel names were saved successfully."
139+ #
140+ # # Check if channel names are correctly saved
141+ # specs = FileSpecifics(names_save[0], multitiff=True)
142+ # channel_names = specs.channel_names
143+ # num_channels = len(channel_names)
144+ # assert num_channels > 0, "No channels found in the specified image."
145+ # assert channel_names[0].istype(str), "Channel names should be strings."
146+ #
147+
148+ if __name__ == "__main__" :
149+
150+ with tempfile .TemporaryDirectory () as temp_results_dir :
151+ print (f"Using temporary directory for results: { temp_results_dir } " )
152+ try :
153+ metabric_data_path = 'data_test/all_ch/METABRIC22_sample/'
154+ run_test_case_metabric (metabric_data_path , temp_results_dir )
155+ print ("\n Pass test case for METABRIC data!" )
156+
157+ except AssertionError as e :
158+ print (f"\n AssertionError: { e } " )
159+ sys .exit (1 )
160+
161+ with tempfile .TemporaryDirectory () as temp_results_dir :
162+ print (f"Using temporary directory for results: { temp_results_dir } " )
163+ try :
164+ stacks_data_path = 'data_test/all_ch/stacks_with_names/'
165+ run_test_case_with_channel_names (stacks_data_path , temp_results_dir )
166+ print ("\n Pass test case 2 for stacks with channel names!" )
167+ print ("\n All tests passed successfully! 🎉" )
168+
169+ except AssertionError as e :
170+ print (f"\n AssertionError: { e } " )
171+ print ("Test failed. ❌" )
172+ sys .exit (1 )
173+ # del the entire folder
174+ # del path_for_results = 'data_test/results_percentile/'
0 commit comments