@@ -101,153 +101,6 @@ def trilinear_interpolate_scalar(
101101 return float (c )
102102
103103
104- def trace_streamline (
105- start_pt : tuple [float , float , float ],
106- vector_field : np .ndarray ,
107- fa_volume : np .ndarray | None = None ,
108- fa_threshold : float = 0.1 ,
109- step_length : float = 0.5 ,
110- max_steps : int | None = 1000 ,
111- angle_threshold : float = 60.0 ,
112- eps : float = 1e-10 ,
113- direction : int = 1 ,
114- ) -> list [tuple [float , float , float ]]:
115- """
116- Trace one streamline from `start_pt` (z,y,x) in the continuous vector_field.
117- - Interpolate & normalize each sub‐step
118- - Move forward by `step_length` voxels each step using RK4
119- - `direction` = +1 (default) or -1 to reverse integration direction
120- """
121- Z , Y , X = vector_field .shape [1 :]
122- coords : list [tuple [float , float , float ]] = [
123- (float (start_pt [0 ]), float (start_pt [1 ]), float (start_pt [2 ]))
124- ]
125- current_pt = np .array (start_pt , dtype = np .float64 )
126- prev_dir : np .ndarray | None = None # previous unit vector
127-
128- def interp_unit (pt : np .ndarray ) -> np .ndarray | None :
129- """Return a normalized direction vector at fractional pt, or None if invalid."""
130- vec = trilinear_interpolate_vector (vector_field , (pt [0 ], pt [1 ], pt [2 ]))
131- if np .isnan (vec ).any ():
132- return None
133- norm = np .linalg .norm (vec )
134- if norm < eps :
135- return None
136- return np .array ([vec [2 ], vec [1 ], vec [0 ]]) / norm * direction # flip to (z,y,x)
137-
138- step_count = 0
139- while max_steps is None or step_count < max_steps :
140- step_count += 1
141-
142- if fa_volume is not None :
143- fa_value = trilinear_interpolate_scalar (fa_volume , tuple (current_pt ))
144- if fa_value < fa_threshold :
145- break
146-
147- k1 = interp_unit (current_pt )
148- if k1 is None :
149- break
150- if prev_dir is not None :
151- angle = np .degrees (np .arccos (np .clip (np .dot (prev_dir , k1 ), - 1.0 , 1.0 )))
152- if angle > angle_threshold :
153- break
154-
155- mid1 = current_pt + 0.5 * step_length * k1
156- k2 = interp_unit (mid1 )
157- if k2 is None :
158- break
159-
160- mid2 = current_pt + 0.5 * step_length * k2
161- k3 = interp_unit (mid2 )
162- if k3 is None :
163- break
164-
165- end_pt = current_pt + step_length * k3
166- k4 = interp_unit (end_pt )
167- if k4 is None :
168- break
169-
170- angle4 = np .degrees (np .arccos (np .clip (np .dot (k1 , k4 ), - 1.0 , 1.0 )))
171- if angle4 > angle_threshold :
172- break
173-
174- increment = (step_length / 6.0 ) * (k1 + 2 * k2 + 2 * k3 + k4 )
175- next_pt = current_pt + increment
176- next_dir = k1
177-
178- zn , yn , xn = next_pt
179- if not (0 <= zn < Z and 0 <= yn < Y and 0 <= xn < X ):
180- break
181-
182- coords .append ((float (zn ), float (yn ), float (xn )))
183- current_pt = next_pt
184- prev_dir = next_dir
185-
186- return coords
187-
188-
189- def generate_streamlines_from_vector_field (
190- vector_field : np .ndarray ,
191- seed_points : np .ndarray ,
192- fa_volume : np .ndarray | None = None ,
193- fa_threshold : float = 0.1 ,
194- step_length : float = 0.5 ,
195- max_steps : int | None = None ,
196- angle_threshold : float = 60.0 ,
197- min_length_pts : int = 10 ,
198- bidirectional : bool = True ,
199- ) -> list [list [tuple [float , float , float ]]]:
200- """
201- Given a 3D vector_field (shape = (3, Z, Y, X)) and a set of integer‐seed voxels,
202- returns a list of streamlines (each streamline = a list of float (z,y,x) points),
203- filtered so that only those longer than `min_length_pts` are kept.
204- Optionally trace in both directions from each seed point.
205- """
206- all_streamlines : list [list [tuple [float , float , float ]]] = []
207- total_seeds = len (seed_points )
208-
209- with alive_bar (total_seeds , title = "Tracing Streamlines" ) as bar :
210- for zi , yi , xi in seed_points :
211- start = (float (zi ), float (yi ), float (xi ))
212-
213- # Forward tracing
214- forward_pts = trace_streamline (
215- start_pt = start ,
216- vector_field = vector_field ,
217- fa_volume = fa_volume ,
218- fa_threshold = fa_threshold ,
219- step_length = step_length ,
220- max_steps = max_steps ,
221- angle_threshold = angle_threshold ,
222- direction = 1 ,
223- )
224-
225- # Backward tracing if enabled
226- if bidirectional :
227- backward_pts = trace_streamline (
228- start_pt = start ,
229- vector_field = vector_field ,
230- fa_volume = fa_volume ,
231- fa_threshold = fa_threshold ,
232- step_length = step_length ,
233- max_steps = max_steps ,
234- angle_threshold = angle_threshold ,
235- direction = - 1 ,
236- )
237- # Remove duplicate seed point and reverse
238- backward_pts = backward_pts [::- 1 ][:- 1 ] if len (backward_pts ) > 1 else []
239- full_streamline = backward_pts + forward_pts
240- else :
241- full_streamline = forward_pts
242-
243- if len (full_streamline ) >= min_length_pts :
244- all_streamlines .append (full_streamline )
245-
246- bar ()
247-
248- return all_streamlines
249-
250-
251104def save_trk_dipy_from_vox_zyx (
252105 streamlines_zyx : list [list [tuple [float , float , float ]]],
253106 out_path : str | Path ,
@@ -295,8 +148,6 @@ def save_trk_dipy_from_vox_zyx(
295148
296149
297150# ---------- tracing ----------
298-
299-
300151def trace_streamline (
301152 start_pt : tuple [float , float , float ],
302153 vector_field : np .ndarray ,
0 commit comments