|
| 1 | +""" |
| 2 | +Script to detect and fix missing channels in OpenEphys XML settings files. |
| 3 | +
|
| 4 | +This script corrects XML files where channels are missing from the |
| 5 | +CHANNELS, ELECTRODE_XPOS, and ELECTRODE_YPOS tags. It fills in missing |
| 6 | +channels by inferring their values based on existing patterns in the file. |
| 7 | +
|
| 8 | +Typical usage example: |
| 9 | + python fix_openephys_xml_missing_channels.py --file_path settings.xml --overwrite --verbose |
| 10 | +""" |
| 11 | +import argparse |
| 12 | +import warnings |
| 13 | +from pathlib import Path |
| 14 | +from typing import Union |
| 15 | + |
| 16 | +import numpy as np |
| 17 | + |
| 18 | +from probeinterface.utils import import_safely |
| 19 | + |
| 20 | + |
| 21 | +def fix_openephys_xml_file( |
| 22 | + file_path: Union[str, Path], |
| 23 | + overwrite: bool = True, |
| 24 | + verbose: bool = False |
| 25 | +): |
| 26 | + """ |
| 27 | + Fix missing channels in an OpenEphys XML settings file. |
| 28 | +
|
| 29 | + This function parses the XML file, detects missing channels in the |
| 30 | + CHANNELS, ELECTRODE_XPOS, and ELECTRODE_YPOS tags, and fills them in |
| 31 | + by inferring values from existing data patterns. |
| 32 | +
|
| 33 | + Parameters |
| 34 | + ---------- |
| 35 | + file_path : Union[str, Path] |
| 36 | + Path to the XML file to be fixed. |
| 37 | + overwrite : bool, optional |
| 38 | + If True, overwrite the original file. If False, save as .fixed.xml. |
| 39 | + Default is True. |
| 40 | + verbose : bool, optional |
| 41 | + If True, print detailed information about the process. |
| 42 | + Default is False. |
| 43 | +
|
| 44 | + Raises |
| 45 | + ------ |
| 46 | + FileNotFoundError |
| 47 | + If the specified file does not exist. |
| 48 | + ValueError |
| 49 | + If unable to infer fill values for missing channels. |
| 50 | + """ |
| 51 | + file_path = Path(file_path) |
| 52 | + if not file_path.exists(): |
| 53 | + raise FileNotFoundError(f"File not found: {file_path}") |
| 54 | + |
| 55 | + # Parse the XML file |
| 56 | + ET = import_safely("xml.etree.ElementTree") |
| 57 | + tree = ET.parse(str(file_path)) |
| 58 | + root = tree.getroot() |
| 59 | + |
| 60 | + # Find all relevant elements |
| 61 | + channels_elements = root.findall(".//CHANNELS") |
| 62 | + xpos_elements = root.findall(".//ELECTRODE_XPOS") |
| 63 | + ypos_elements = root.findall(".//ELECTRODE_YPOS") |
| 64 | + |
| 65 | + for channels, xpos, ypos in zip(channels_elements, xpos_elements, ypos_elements): |
| 66 | + channel_names = np.array(list(channels.attrib.keys())) |
| 67 | + channel_ids = np.array([int(ch[2:]) for ch in channel_names]) |
| 68 | + sorted_channel_ids = sorted(channel_ids) |
| 69 | + all_channel_ids = set(range(sorted_channel_ids[0], sorted_channel_ids[-1] + 1)) |
| 70 | + missing_channels = sorted(all_channel_ids - set(sorted_channel_ids)) |
| 71 | + |
| 72 | + if not missing_channels: |
| 73 | + if verbose: |
| 74 | + print("No missing channels detected.") |
| 75 | + continue |
| 76 | + |
| 77 | + warnings.warn(f"Missing channels detected in XML: {missing_channels}") |
| 78 | + |
| 79 | + # Detect repeating pattern for <ELECTRODE_XPOS> |
| 80 | + xpos_values = [int(value) for value in xpos.attrib.values()] |
| 81 | + pattern_length = next( |
| 82 | + (i for i in range(1, len(xpos_values) // 2) if xpos_values[:i] == xpos_values[i:2 * i]), |
| 83 | + len(xpos_values) |
| 84 | + ) |
| 85 | + xpos_pattern = xpos_values[:pattern_length] |
| 86 | + |
| 87 | + # Detect step for <ELECTRODE_YPOS> |
| 88 | + ypos_values = [int(value) for value in ypos.attrib.values()] |
| 89 | + unique_steps = np.unique(np.diff(sorted(set(ypos_values)))) |
| 90 | + if len(unique_steps) != 1: |
| 91 | + raise ValueError("Unable to determine unique step size for ELECTRODE_YPOS.") |
| 92 | + ypos_step = unique_steps[0] |
| 93 | + |
| 94 | + # Fill in missing channels |
| 95 | + for channel_id in missing_channels: |
| 96 | + # Find the closest channel before or after |
| 97 | + prev_channels = [ch for ch in channel_ids if ch < channel_id] |
| 98 | + next_channels = [ch for ch in channel_ids if ch > channel_id] |
| 99 | + |
| 100 | + if prev_channels: |
| 101 | + nearest_channel_id = max(prev_channels) |
| 102 | + elif next_channels: |
| 103 | + nearest_channel_id = min(next_channels) |
| 104 | + else: |
| 105 | + raise ValueError(f"Cannot find reference channel for missing channel {channel_id}") |
| 106 | + |
| 107 | + channel_fill_value = channels.attrib[f"CH{nearest_channel_id}"] |
| 108 | + channels.set(f"CH{channel_id}", channel_fill_value) |
| 109 | + |
| 110 | + xpos_fill_value = xpos_pattern[channel_id % pattern_length] |
| 111 | + xpos.set(f"CH{channel_id}", str(xpos_fill_value)) |
| 112 | + |
| 113 | + ypos_fill_value = (channel_id // 2) * ypos_step |
| 114 | + ypos.set(f"CH{channel_id}", str(ypos_fill_value)) |
| 115 | + |
| 116 | + if not overwrite: |
| 117 | + file_path = file_path.with_suffix(".fixed.xml") |
| 118 | + |
| 119 | + # Save the updated XML |
| 120 | + tree.write(file_path) |
| 121 | + if verbose: |
| 122 | + print(f"Fixed XML file saved to: {file_path}") |
| 123 | + |
| 124 | + |
| 125 | +def main(): |
| 126 | + """ |
| 127 | + Command-line interface for fixing OpenEphys XML files. |
| 128 | +
|
| 129 | + Parses command-line arguments and calls fix_openephys_xml_file. |
| 130 | + """ |
| 131 | + parser = argparse.ArgumentParser(description="Fix missing channels in OpenEphys XML settings files.") |
| 132 | + parser.add_argument("--file_path", type=str, required=True, help="Path to the XML file to fix.") |
| 133 | + parser.add_argument("--overwrite", action="store_true", help="Overwrite the original file.") |
| 134 | + parser.add_argument("--verbose", action="store_true", help="Print detailed information.") |
| 135 | + args = parser.parse_args() |
| 136 | + |
| 137 | + fix_openephys_xml_file( |
| 138 | + file_path=args.file_path, |
| 139 | + overwrite=args.overwrite, |
| 140 | + verbose=args.verbose |
| 141 | + ) |
| 142 | + |
| 143 | + |
| 144 | +if __name__ == "__main__": |
| 145 | + main() |
0 commit comments