|
| 1 | +import logging |
| 2 | +from time import sleep |
| 3 | + |
| 4 | +import numpy as np |
| 5 | +from PIL import Image, ImageTk |
| 6 | +import tkinter as tk |
| 7 | +from tkinter import messagebox, ttk |
| 8 | +from typing import List |
| 9 | + |
| 10 | +from ulc_mm_package.hardware.camera import AVTCamera |
| 11 | +from ulc_mm_package.hardware.hardware_constants import MIN_PRESSURE_DIFF |
| 12 | +from ulc_mm_package.hardware.pneumatic_module import PneumaticModule, PressureLeak |
| 13 | +from ulc_mm_package.hardware.motorcontroller import DRV8825Nema |
| 14 | +from ulc_mm_package.hardware.led_driver_tps54201ddct import LED_TPS5420TDDCT |
| 15 | +from ulc_mm_package.hardware.scope_routines import CellFinder, NoCellsFound |
| 16 | +from ulc_mm_package.scope_constants import CAMERA_SELECTION |
| 17 | + |
| 18 | +PNEUMATIC_PULL_TIME_S = 7 |
| 19 | +LED_BRIGHTNESS_PERC = 0.15 |
| 20 | +MIN_ACCEPTABLE_MOTOR_POS = 100 |
| 21 | +MAX_ACCEPTABLE_MOTOR_POS = 800 |
| 22 | + |
| 23 | +# Set up logging |
| 24 | +logger = logging.getLogger() |
| 25 | +if not logger.hasHandlers(): |
| 26 | + logger.setLevel(logging.DEBUG) |
| 27 | + ch = logging.StreamHandler() |
| 28 | + ch.setLevel(logging.DEBUG) |
| 29 | + formatter = logging.Formatter( |
| 30 | + "%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" |
| 31 | + ) |
| 32 | + ch.setFormatter(formatter) |
| 33 | + logger.addHandler(ch) |
| 34 | + |
| 35 | + |
| 36 | +def init_hardware(): |
| 37 | + logger.info("Initializing required hardware...") |
| 38 | + |
| 39 | + camera = AVTCamera() |
| 40 | + pm = PneumaticModule() |
| 41 | + motor = DRV8825Nema() |
| 42 | + motor.homeToLimitSwitches() |
| 43 | + led = LED_TPS5420TDDCT() |
| 44 | + led.turnOn() |
| 45 | + led.setDutyCycle(0) |
| 46 | + |
| 47 | + return camera, pm, motor, led |
| 48 | + |
| 49 | + |
| 50 | +def perform_sweep( |
| 51 | + camera, |
| 52 | + motor, |
| 53 | + led, |
| 54 | + sweep_range, |
| 55 | + cell_finder: CellFinder, |
| 56 | + progress_callback, |
| 57 | + image_callback, |
| 58 | + motor_label_callback, |
| 59 | +) -> List[np.ndarray]: |
| 60 | + logger.info("Starting sweep...") |
| 61 | + cell_finder.reset() |
| 62 | + led.turnOn() |
| 63 | + led.setDutyCycle(LED_BRIGHTNESS_PERC) |
| 64 | + total_steps = len(sweep_range) |
| 65 | + images = [] |
| 66 | + |
| 67 | + logger.info(f"Moving motor...") |
| 68 | + for step, motor_pos in enumerate(sweep_range, start=1): |
| 69 | + motor.move_abs(motor_pos) |
| 70 | + img, timestamp = next(camera.yieldImages()) |
| 71 | + images.append(img) |
| 72 | + cell_finder.add_image(motor_pos, img) |
| 73 | + progress_callback(step, total_steps) |
| 74 | + image_callback(img) |
| 75 | + motor_label_callback(motor_pos) |
| 76 | + |
| 77 | + led.turnOff() |
| 78 | + |
| 79 | + return images, list(sweep_range) |
| 80 | + |
| 81 | + |
| 82 | +def determine_sweep_range(motor): |
| 83 | + current_position = motor.pos |
| 84 | + max_position = motor.max_pos |
| 85 | + min_position = 0 |
| 86 | + |
| 87 | + if current_position > max_position / 2: |
| 88 | + logger.info("Motor at top range. Sweeping from top to bottom.") |
| 89 | + return range(max_position, min_position - 1, -10) |
| 90 | + else: |
| 91 | + logger.info("Motor not at top range. Sweeping from bottom to top.") |
| 92 | + return range(min_position, max_position + 1, 10) |
| 93 | + |
| 94 | + |
| 95 | +def pressure_check(pm: PneumaticModule) -> bool: |
| 96 | + """Ensure the lid is closed by checking the pressure.""" |
| 97 | + initial_pressure, _ = pm.getPressure() |
| 98 | + pm.setDutyCycle(pm.getMinDutyCycle()) |
| 99 | + sleep(0.5) |
| 100 | + post_pull_pressure, _ = pm.getPressureMaxReadAttempts(max_attempts=10) |
| 101 | + pm.setDutyCycle(pm.getMaxDutyCycle()) |
| 102 | + |
| 103 | + pressure_diff = initial_pressure - post_pull_pressure |
| 104 | + logger.info(f"Measure pressure difference is: {pressure_diff}hPa.") |
| 105 | + return pressure_diff >= MIN_PRESSURE_DIFF |
| 106 | + |
| 107 | + |
| 108 | +def main(): |
| 109 | + # Initialize hardware |
| 110 | + camera, pm, motor, led = init_hardware() |
| 111 | + cell_finder = CellFinder() |
| 112 | + |
| 113 | + # Retrieve camera dimensions |
| 114 | + img_width, img_height = ( |
| 115 | + CAMERA_SELECTION.img_dims().width, |
| 116 | + CAMERA_SELECTION.img_dims().height, |
| 117 | + ) |
| 118 | + |
| 119 | + # Set up GUI |
| 120 | + root = tk.Tk() |
| 121 | + root.title("Coarse Focus Adjustment Utility") |
| 122 | + root.geometry("800x600") |
| 123 | + root.grid_rowconfigure(1, weight=1) |
| 124 | + root.grid_columnconfigure(0, weight=1) |
| 125 | + |
| 126 | + # Components |
| 127 | + status_label = tk.Label(root, text="", font=("Helvetica", 16)) |
| 128 | + status_label.grid(row=0, column=0, pady=10, sticky="n") |
| 129 | + status_label.config(text="Please load a flow cell (with blood) and close the lid.") |
| 130 | + |
| 131 | + image_canvas = tk.Label(root, bg="black") |
| 132 | + image_canvas.grid(row=1, column=0, pady=10, sticky="n") |
| 133 | + |
| 134 | + progress = ttk.Progressbar(root, orient="horizontal", mode="determinate") |
| 135 | + progress.grid(row=2, column=0, pady=10, sticky="ew") |
| 136 | + |
| 137 | + motor_label = tk.Label(root, text="Motor Position: 0", font=("Helvetica", 14)) |
| 138 | + motor_label.grid(row=3, column=0, pady=10, sticky="n") |
| 139 | + |
| 140 | + button_frame = tk.Frame(root) |
| 141 | + button_frame.grid(row=5, column=0, pady=20, sticky="se") |
| 142 | + |
| 143 | + slider_frame = tk.Frame(root) |
| 144 | + slider_frame.grid(row=4, column=0, pady=10, sticky="n") |
| 145 | + |
| 146 | + slider = None # Global reference to the slider |
| 147 | + first_sweep_done = False # Track if the first sweep has been completed |
| 148 | + |
| 149 | + def update_progress(current, total): |
| 150 | + progress["maximum"] = total |
| 151 | + progress["value"] = current |
| 152 | + root.update_idletasks() |
| 153 | + |
| 154 | + def update_image(img): |
| 155 | + img = Image.fromarray(img) |
| 156 | + # Maintain aspect ratio while resizing to fit within the GUI window dimensions |
| 157 | + canvas_width = 800 |
| 158 | + canvas_height = 600 |
| 159 | + scale = min(canvas_width / img_width, canvas_height / img_height) |
| 160 | + new_width = int(img_width * scale) |
| 161 | + new_height = int(img_height * scale) |
| 162 | + img = img.resize((new_width, new_height), Image.ANTIALIAS) |
| 163 | + img_tk = ImageTk.PhotoImage(img) |
| 164 | + image_canvas.config(image=img_tk) |
| 165 | + image_canvas.image = img_tk |
| 166 | + |
| 167 | + def update_motor_label(position): |
| 168 | + motor_label.config(text=f"Motor Position: {position}") |
| 169 | + |
| 170 | + def display_images(images: List[np.ndarray], motor_positions: List[int]): |
| 171 | + nonlocal slider # Reference the global slider |
| 172 | + if slider: |
| 173 | + slider.destroy() # Destroy the existing slider if present |
| 174 | + |
| 175 | + def on_slider_move(value): |
| 176 | + index = int(float(value)) |
| 177 | + update_image(images[index]) |
| 178 | + motor_label.config(text=f"Motor Position: {motor_positions[index]}") |
| 179 | + |
| 180 | + slider = tk.Scale( |
| 181 | + slider_frame, |
| 182 | + from_=0, |
| 183 | + to=len(images) - 1, |
| 184 | + orient="horizontal", |
| 185 | + command=on_slider_move, |
| 186 | + length=500, |
| 187 | + ) |
| 188 | + slider.pack() |
| 189 | + for motor_pos in motor_positions: |
| 190 | + slider.set(motor_pos) # Set the tick marks to motor positions |
| 191 | + update_image(images[0]) # Display the first image initially |
| 192 | + motor_label.config(text=f"Motor Position: {motor_positions[0]}") |
| 193 | + |
| 194 | + def start_sweep(): |
| 195 | + nonlocal first_sweep_done |
| 196 | + status_label.config(text="Sweeping in progress...") |
| 197 | + root.update() |
| 198 | + |
| 199 | + if not first_sweep_done: |
| 200 | + status_label.config(text="Checking that a flow cell is loaded...") |
| 201 | + root.update() |
| 202 | + if not pressure_check(pm): |
| 203 | + messagebox.showinfo( |
| 204 | + "Error", |
| 205 | + "Please ensure the CAP module is on and a flow cell is loaded.", |
| 206 | + ) |
| 207 | + status_label.config( |
| 208 | + text="Please load a flow cell (with blood) and close the lid." |
| 209 | + ) |
| 210 | + root.update() |
| 211 | + return |
| 212 | + status_label.config(text="Pulling RBCs into the field of view...") |
| 213 | + root.update() |
| 214 | + pm.setDutyCycle(pm.getMinDutyCycle()) |
| 215 | + sleep(PNEUMATIC_PULL_TIME_S) # Allow cells to enter |
| 216 | + pm.setDutyCycle(pm.getMaxDutyCycle()) |
| 217 | + first_sweep_done = True |
| 218 | + |
| 219 | + progress["value"] = 0 |
| 220 | + status_label.config(text="Sweeping in progress...") |
| 221 | + root.update() |
| 222 | + sweep_range = determine_sweep_range(motor) |
| 223 | + images, motor_positions = perform_sweep( |
| 224 | + camera, |
| 225 | + motor, |
| 226 | + led, |
| 227 | + sweep_range, |
| 228 | + cell_finder, |
| 229 | + update_progress, |
| 230 | + update_image, |
| 231 | + update_motor_label, |
| 232 | + ) |
| 233 | + |
| 234 | + try: |
| 235 | + result = cell_finder.get_cells_found_position() |
| 236 | + except NoCellsFound: |
| 237 | + result = None |
| 238 | + |
| 239 | + if result is None: |
| 240 | + messagebox.showinfo( |
| 241 | + "Manual Adjustment Required", |
| 242 | + "No cells found. Use the slider to manually review images.", |
| 243 | + ) |
| 244 | + display_images(images, motor_positions) |
| 245 | + elif result >= MAX_ACCEPTABLE_MOTOR_POS: |
| 246 | + messagebox.showinfo( |
| 247 | + "Adjustment Required", |
| 248 | + "Cells found too high. Please pull up on the stage.", |
| 249 | + ) |
| 250 | + elif result <= MIN_ACCEPTABLE_MOTOR_POS: |
| 251 | + messagebox.showinfo( |
| 252 | + "Adjustment Required", |
| 253 | + "Cells found too low. Please push down on the stage.", |
| 254 | + ) |
| 255 | + elif MIN_ACCEPTABLE_MOTOR_POS < result < MAX_ACCEPTABLE_MOTOR_POS: |
| 256 | + res = result |
| 257 | + output_string = ( |
| 258 | + f"Stage adjustment successful!\nCells found at motor position {res}." |
| 259 | + ) |
| 260 | + messagebox.showinfo(f"Success", output_string) |
| 261 | + else: |
| 262 | + messagebox.showerror("Error", "An unexpected error occurred.") |
| 263 | + |
| 264 | + status_label.config(text="Sweep completed.") |
| 265 | + |
| 266 | + def quit_application(): |
| 267 | + camera.deactivateCamera() |
| 268 | + led.turnOff() |
| 269 | + root.destroy() |
| 270 | + |
| 271 | + # Buttons |
| 272 | + tk.Button( |
| 273 | + button_frame, text="Start Sweep", font=("Helvetica", 16), command=start_sweep |
| 274 | + ).pack(side=tk.RIGHT, padx=10) |
| 275 | + tk.Button( |
| 276 | + button_frame, text="Quit", font=("Helvetica", 16), command=quit_application |
| 277 | + ).pack(side=tk.RIGHT, padx=10) |
| 278 | + |
| 279 | + # Run the application |
| 280 | + root.mainloop() |
| 281 | + |
| 282 | + |
| 283 | +if __name__ == "__main__": |
| 284 | + main() |
0 commit comments