diff --git a/HEADSTAGE_SENSORS.md b/HEADSTAGE_SENSORS.md new file mode 100644 index 0000000..9c269ff --- /dev/null +++ b/HEADSTAGE_SENSORS.md @@ -0,0 +1,128 @@ +# Headstage Sensor Data Separation + +This document describes the new functionality for separating headstage sensor data into individual TimeSeries objects with appropriate units and scaling. + +## Overview + +Previously, all analog data (including headstage sensors) was combined into a single TimeSeries object stored in the processing module with unit "-1". The new implementation creates separate TimeSeries objects for each sensor type and stores them in the acquisition module with proper units and scaling. + +## Sensor Types and Scaling + +The following sensor types are automatically detected and processed: + +### Accelerometer Data +- **Channels**: `Headstage_AccelX`, `Headstage_AccelY`, `Headstage_AccelZ` +- **Scaling**: Raw values × 0.000061 (converts to g units) +- **Unit**: `g` (gravity units) +- **Description**: Headstage accelerometer data with ±2G full range + +### Gyroscope Data +- **Channels**: `Headstage_GyroX`, `Headstage_GyroY`, `Headstage_GyroZ` +- **Scaling**: Raw values × 0.061 (converts to degrees/second) +- **Unit**: `d/s` (degrees per second) +- **Description**: Headstage gyroscope data with ±2000 dps range + +### Magnetometer Data +- **Channels**: `Headstage_MagX`, `Headstage_MagY`, `Headstage_MagZ` +- **Scaling**: No scaling applied (1.0) +- **Unit**: `unspecified` +- **Description**: Headstage magnetometer data + +### Analog Input Channels +- **Channels**: `ECU_Ain1`, `ECU_Ain2`, etc., `Controller_Ain1`, etc. +- **Scaling**: No scaling applied (1.0) +- **Unit**: `unspecified` (can be customized in metadata) +- **Description**: Analog input channel data + +## Metadata Configuration + +### Basic Configuration +The existing `units` section in your YAML metadata still works: + +```yaml +units: + analog: "unspecified" + behavioral_events: "unspecified" +``` + +### Advanced Configuration +You can now specify custom units for each sensor type using the new `sensor_units` section: + +```yaml +sensor_units: + accelerometer: "g" # Custom unit for accelerometer + gyroscope: "d/s" # Custom unit for gyroscope + magnetometer: "T" # Custom unit for magnetometer (Tesla) + analog_input: "V" # Custom unit for analog inputs (Volts) +``` + +### Behavioral Events with Units +Individual behavioral events can now specify their own units: + +```yaml +behavioral_events: + - description: Din1 + name: Light_1 + comments: Indicator for reward delivery + unit: "unspecified" + - description: ECU_Ain1 + name: Analog_Input_1 + comments: Voltage measurement + unit: "V" +``` + +## Output Structure + +### New Behavior (Default) +- Separate TimeSeries objects stored in `nwbfile.acquisition` +- Each sensor type gets its own TimeSeries with proper units +- Applied scaling factors for accelerometer and gyroscope data +- Descriptive names and channel information + +### Legacy Behavior (Optional) +You can still use the old combined approach by setting `separate_sensor_data=False`: + +```python +add_analog_data(nwbfile, rec_files, metadata=metadata, separate_sensor_data=False) +``` + +This will create the original single TimeSeries in the processing module. + +## Example Usage + +```python +from trodes_to_nwb.convert_analog import add_analog_data + +# Load your metadata with optional sensor_units configuration +metadata = { + "sensor_units": { + "accelerometer": "g", + "gyroscope": "d/s" + } +} + +# Add analog data with new separated sensor approach +add_analog_data(nwbfile, rec_files, metadata=metadata) + +# Result: Individual TimeSeries in nwbfile.acquisition: +# - "accelerometer" (scaled data in g units) +# - "gyroscope" (scaled data in d/s units) +# - "magnetometer" (raw data) +# - "ecu_analog_input" (ECU analog channels) +``` + +## Benefits + +1. **Clear Data Organization**: Each sensor type has its own TimeSeries with descriptive names +2. **Proper Units**: Automatic application of correct physical units +3. **Accurate Scaling**: Raw integer values converted to meaningful physical measurements +4. **Better Documentation**: Channel names and descriptions preserved in TimeSeries +5. **NWB Compliance**: Data stored in appropriate acquisition module +6. **Backwards Compatibility**: Option to use legacy behavior if needed + +## Migration Notes + +- Existing code will continue to work with the new default behavior +- The new approach stores data in `acquisition` instead of `processing["analog"]` +- If you need the old behavior, use `separate_sensor_data=False` +- Update analysis code to read from `nwbfile.acquisition` instead of `nwbfile.processing["analog"]` \ No newline at end of file diff --git a/sample_metadata_with_sensor_units.yml b/sample_metadata_with_sensor_units.yml new file mode 100644 index 0000000..a79d4ad --- /dev/null +++ b/sample_metadata_with_sensor_units.yml @@ -0,0 +1,84 @@ +experimenter_name: + - lastname, firstname +lab: Loren Frank Lab +institution: UCSF +experiment_description: Test Conversion with Headstage Sensor Units +session_description: test yaml insertion with separate sensor TimeSeries +session_id: "12345" +keywords: + - testing + - headstage_sensors +subject: + description: Long-Evans Rat + genotype: Obese Prone CD Rat + sex: M + species: Rattus pyctoris + subject_id: "54321" + date_of_birth: 2000-01-01T00:00:00.000Z + weight: 100 +data_acq_device: + - name: SpikeGadgets + system: SpikeGadgets + amplifier: Intan + adc_circuit: Intan +cameras: + - id: 0 + meters_per_pixel: 0.001 + manufacturer: Allied Vision + model: model1 + lens: lens1 + camera_name: test camera 1 +tasks: + - task_name: Sleep + task_description: sleeping + task_environment: sleep box + camera_id: + - 0 + task_epochs: + - 1 +associated_files: [] +associated_video_files: [] + +# Updated units specification with individual sensor units +units: + analog: "unspecified" # Default for uncategorized analog channels + behavioral_events: "unspecified" # Default for DIO events + +# Optional: specify custom units for specific sensor types +sensor_units: + accelerometer: "g" # Will override default + gyroscope: "d/s" # Will override default + magnetometer: "unspecified" + analog_input: "V" # For ECU analog inputs + +times_period_multiplier: 1 +raw_data_to_volts: 1.95e-07 +default_header_file_path: /stelmo/sam/test_data + +device: + name: + - device1 + +# Enhanced behavioral_events with individual unit specifications +behavioral_events: + - description: Din1 + name: Light_1 + comments: Indicator for reward delivery + unit: "unspecified" # Digital events are dimensionless + - description: Din2 + name: Light_2 + unit: "unspecified" + - description: Dout2 + name: Poke_1 + unit: "unspecified" + - description: ECU_Ain1 + name: Analog_Input_1 + comments: Custom analog input channel + unit: "V" # Voltage + - description: ECU_Ain2 + name: Analog_Input_2 + comments: Another analog input + unit: "mV" # Millivolts + +electrode_groups: [] +ntrode_electrode_group_channel_map: [] \ No newline at end of file diff --git a/src/trodes_to_nwb/convert.py b/src/trodes_to_nwb/convert.py index e4c7cee..f1d8c1a 100644 --- a/src/trodes_to_nwb/convert.py +++ b/src/trodes_to_nwb/convert.py @@ -309,6 +309,7 @@ def _create_nwb( rec_filepaths, timestamps=rec_dci_timestamps, behavior_only=behavior_only, + metadata=metadata, ) logger.info("ADDING SAMPLE COUNTS") add_sample_count(nwb_file, rec_dci) diff --git a/src/trodes_to_nwb/convert_analog.py b/src/trodes_to_nwb/convert_analog.py index d1c5693..de78a59 100644 --- a/src/trodes_to_nwb/convert_analog.py +++ b/src/trodes_to_nwb/convert_analog.py @@ -1,11 +1,12 @@ """Module for handling the conversion of ECU analog and headstage sensor data streams from Trodes .rec files to NWB format.""" +import re from xml.etree import ElementTree import numpy as np import pynwb from hdmf.backends.hdf5 import H5DataIO -from pynwb import NWBFile +from pynwb import NWBFile, TimeSeries from trodes_to_nwb import convert_rec_header from trodes_to_nwb.convert_ephys import RecFileDataChunkIterator @@ -13,25 +14,282 @@ DEFAULT_CHUNK_TIME_DIM = 16384 DEFAULT_CHUNK_MAX_CHANNEL_DIM = 32 +# Sensor type definitions with scaling factors and units +SENSOR_TYPE_CONFIG = { + 'accelerometer': { + 'pattern': r'Headstage_Accel[XYZ]', + 'scaling_factor': 0.000061, # Convert to g units + 'unit': 'g', + 'description': 'Headstage accelerometer data' + }, + 'gyroscope': { + 'pattern': r'Headstage_Gyro[XYZ]', + 'scaling_factor': 0.061, # Convert to degrees/second + 'unit': 'd/s', + 'description': 'Headstage gyroscope data' + }, + 'magnetometer': { + 'pattern': r'Headstage_Mag[XYZ]', + 'scaling_factor': 1.0, # No scaling specified in issue + 'unit': 'unspecified', + 'description': 'Headstage magnetometer data' + }, + 'analog_input': { + 'pattern': r'(ECU_Ain\d+|Controller_Ain\d+)', + 'scaling_factor': 1.0, + 'unit': 'unspecified', + 'description': 'Analog input channel' + } +} + + +def _categorize_sensor_channels(channel_names: list[str]) -> dict[str, list[str]]: + """Categorize sensor channels by type based on naming patterns. + + Parameters + ---------- + channel_names : list[str] + List of channel names to categorize + + Returns + ------- + dict[str, list[str]] + Dictionary mapping sensor types to lists of channel names + """ + categorized = {} + + for sensor_type, config in SENSOR_TYPE_CONFIG.items(): + pattern = config['pattern'] + matching_channels = [name for name in channel_names if re.match(pattern, name)] + if matching_channels: + categorized[sensor_type] = matching_channels + + # Handle uncategorized channels + categorized_flat = [name for channels in categorized.values() for name in channels] + uncategorized = [name for name in channel_names if name not in categorized_flat] + if uncategorized: + categorized['other'] = uncategorized + + return categorized + + +def _create_sensor_timeseries( + sensor_type: str, + channel_names: list[str], + data: np.ndarray, + timestamps: np.ndarray, + metadata: dict = None +) -> TimeSeries: + """Create a TimeSeries object for a specific sensor type. + + Parameters + ---------- + sensor_type : str + Type of sensor (accelerometer, gyroscope, etc.) + channel_names : list[str] + Names of channels for this sensor type + data : np.ndarray + Raw sensor data + timestamps : np.ndarray + Timestamps for the data + metadata : dict, optional + Metadata dictionary for custom units/scaling + + Returns + ------- + TimeSeries + Configured TimeSeries object for the sensor type + """ + config = SENSOR_TYPE_CONFIG.get(sensor_type, { + 'scaling_factor': 1.0, + 'unit': 'unspecified', + 'description': f'{sensor_type} data' + }) + + # Apply scaling factor + scaled_data = data * config['scaling_factor'] + + # Create description with channel names + description = f"{config['description']}: {', '.join(channel_names)}" + + # Use custom units from metadata if available + unit = config['unit'] + if metadata and 'sensor_units' in metadata and sensor_type in metadata['sensor_units']: + unit = metadata['sensor_units'][sensor_type] + + return TimeSeries( + name=sensor_type, + description=description, + data=scaled_data, + unit=unit, + timestamps=timestamps, + ) + def add_analog_data( nwbfile: NWBFile, rec_file_path: list[str], timestamps: np.ndarray = None, behavior_only: bool = False, + metadata: dict = None, + separate_sensor_data: bool = True, **kwargs, ) -> None: - """Adds analog streams to the nwb file. + """Adds analog streams to the nwb file as separate TimeSeries objects for each sensor type. Parameters ---------- nwbfile : NWBFile nwb file being assembled - recfile : list[str] + rec_file_path : list[str] ordered list of file paths to all recfiles with session's data + timestamps : np.ndarray, optional + timestamps for the data + behavior_only : bool, optional + if True, only include behavioral data + metadata : dict, optional + metadata dictionary for custom units and scaling + separate_sensor_data : bool, optional + if True, create separate TimeSeries for each sensor type (new behavior) + if False, use legacy combined TimeSeries approach """ - # TODO: ADD HEADSTAGE DATA + + # Legacy behavior for backwards compatibility + if not separate_sensor_data: + return _add_analog_data_legacy(nwbfile, rec_file_path, timestamps, behavior_only, **kwargs) + + # New behavior: separate sensor TimeSeries + # Get the ids of the analog channels from the first rec file header + root = convert_rec_header.read_header(rec_file_path[0]) + hconf = root.find("HardwareConfiguration") + ecu_conf = None + for conf in hconf: + if conf.attrib["name"] == "ECU": + ecu_conf = conf + break + + # Get ECU analog channel IDs + ecu_analog_channel_ids = [] + if ecu_conf is not None: + for channel in ecu_conf: + if channel.attrib["dataType"] == "analog": + ecu_analog_channel_ids.append(channel.attrib["id"]) + + # Make the data chunk iterator for ECU analog data + if ecu_analog_channel_ids: + rec_dci = RecFileDataChunkIterator( + rec_file_path, + nwb_hw_channel_order=ecu_analog_channel_ids, + stream_id="ECU_analog", + is_analog=True, + timestamps=timestamps, + behavior_only=behavior_only, + ) + + # Get headstage sensor channel IDs from multiplexed channels + headstage_channel_ids = list(rec_dci.neo_io[0].multiplexed_channel_xml.keys()) if rec_dci.neo_io else [] + + # Process ECU analog channels + if ecu_analog_channel_ids: + # Get ECU analog data (without headstage data) + ecu_data = rec_dci._get_data((slice(None), slice(0, len(ecu_analog_channel_ids)))) + + # Categorize ECU analog channels + ecu_categorized = _categorize_sensor_channels(ecu_analog_channel_ids) + + # Create TimeSeries for each ECU sensor type + for sensor_type, channel_names in ecu_categorized.items(): + channel_indices = [ecu_analog_channel_ids.index(name) for name in channel_names] + sensor_data = ecu_data[:, channel_indices] + + timeseries = _create_sensor_timeseries( + sensor_type=f"ecu_{sensor_type}", + channel_names=channel_names, + data=sensor_data, + timestamps=rec_dci.timestamps, + metadata=metadata + ) + + # Add to acquisition + nwbfile.add_acquisition(timeseries) + + # Process headstage sensor channels if any exist + if headstage_channel_ids: + # Get headstage sensor data + headstage_data = rec_dci.neo_io[0].get_analogsignal_multiplexed(headstage_channel_ids) + + # Categorize headstage channels by sensor type + headstage_categorized = _categorize_sensor_channels(headstage_channel_ids) + + # Create separate TimeSeries for each sensor type + for sensor_type, channel_names in headstage_categorized.items(): + channel_indices = [headstage_channel_ids.index(name) for name in channel_names] + sensor_data = headstage_data[:, channel_indices] + + timeseries = _create_sensor_timeseries( + sensor_type=sensor_type, + channel_names=channel_names, + data=sensor_data, + timestamps=rec_dci.timestamps, + metadata=metadata + ) + + # Add to acquisition + nwbfile.add_acquisition(timeseries) + else: + # If no ECU analog channels, create a minimal iterator to get headstage data + try: + from trodes_to_nwb.spike_gadgets_raw_io import SpikeGadgetsRawIO + neo_io = SpikeGadgetsRawIO(filename=rec_file_path[0]) + neo_io.parse_header() + + # Get headstage sensor channel IDs from multiplexed channels + headstage_channel_ids = list(neo_io.multiplexed_channel_xml.keys()) if hasattr(neo_io, 'multiplexed_channel_xml') else [] + + if headstage_channel_ids: + # Get headstage sensor data + headstage_data = neo_io.get_analogsignal_multiplexed(headstage_channel_ids) + + # Create timestamps if not provided + if timestamps is None: + timestamps = neo_io.get_analogsignal_timestamps(0, headstage_data.shape[0]) + + # Categorize headstage channels by sensor type + headstage_categorized = _categorize_sensor_channels(headstage_channel_ids) + + # Create separate TimeSeries for each sensor type + for sensor_type, channel_names in headstage_categorized.items(): + channel_indices = [headstage_channel_ids.index(name) for name in channel_names] + sensor_data = headstage_data[:, channel_indices] + + timeseries = _create_sensor_timeseries( + sensor_type=sensor_type, + channel_names=channel_names, + data=sensor_data, + timestamps=timestamps[:sensor_data.shape[0]], # Ensure same length + metadata=metadata + ) + + # Add to acquisition + nwbfile.add_acquisition(timeseries) + except Exception as e: + # If headstage processing fails, log warning but don't crash + import logging + logger = logging.getLogger("convert") + logger.warning(f"Could not process headstage sensor data: {e}") + +def _add_analog_data_legacy( + nwbfile: NWBFile, + rec_file_path: list[str], + timestamps: np.ndarray = None, + behavior_only: bool = False, + **kwargs, +) -> None: + """Legacy function for adding analog data as a single combined TimeSeries. + + This preserves the original behavior for backwards compatibility. + """ # get the ids of the analog channels from the first rec file header root = convert_rec_header.read_header(rec_file_path[0]) hconf = root.find("HardwareConfiguration") @@ -41,12 +299,15 @@ def add_analog_data( ecu_conf = conf break analog_channel_ids = [] - for channel in ecu_conf: - if channel.attrib["dataType"] == "analog": - analog_channel_ids.append(channel.attrib["id"]) + if ecu_conf is not None: + for channel in ecu_conf: + if channel.attrib["dataType"] == "analog": + analog_channel_ids.append(channel.attrib["id"]) + + if not analog_channel_ids: + return # No analog channels to process # make the data chunk iterator - # TODO use the stream name instead of the stream index to be more robust rec_dci = RecFileDataChunkIterator( rec_file_path, nwb_hw_channel_order=analog_channel_ids, @@ -61,8 +322,6 @@ def add_analog_data( # (16384, 32) chunks of dtype int16 (2 bytes) is 1 MB, which is recommended # by studies by the NWB team. - # could also add compression here. zstd/blosc-zstd are recommended by the NWB team, but - # they require the hdf5plugin library to be installed. gzip is available by default. data_data_io = H5DataIO( rec_dci, chunks=( diff --git a/src/trodes_to_nwb/convert_dios.py b/src/trodes_to_nwb/convert_dios.py index e532cae..a97c596 100644 --- a/src/trodes_to_nwb/convert_dios.py +++ b/src/trodes_to_nwb/convert_dios.py @@ -29,11 +29,20 @@ def _get_channel_name_map(metadata: dict) -> dict[str, str]: raise ValueError( f"Duplicate channel name {dio_event['description']} in metadata YAML" ) + + # Get unit for this specific event type, or use default + unit = "unspecified" # Default unit for digital events + if "unit" in dio_event: + unit = dio_event["unit"] + elif "units" in metadata and "behavioral_events" in metadata["units"]: + unit = metadata["units"]["behavioral_events"] + channel_name_map[dio_event["description"]] = { "name": dio_event["name"], "comments": ( dio_event["comments"] if "comments" in dio_event else "no comments" ), + "unit": unit } return channel_name_map @@ -101,7 +110,7 @@ def add_dios(nwbfile: NWBFile, recfile: list[str], metadata: dict) -> None: comments=channel_name_map[channel_name]["comments"], description=channel_name, data=state_changes, - unit="-1", # TODO change to "N/A", + unit=channel_name_map[channel_name]["unit"], timestamps=timestamps, # TODO adjust timestamps ) beh_events.add_timeseries(ts) diff --git a/src/trodes_to_nwb/nwb_schema.json b/src/trodes_to_nwb/nwb_schema.json index 5a8e871..f361540 100644 --- a/src/trodes_to_nwb/nwb_schema.json +++ b/src/trodes_to_nwb/nwb_schema.json @@ -656,6 +656,50 @@ } } }, + "sensor_units": { + "$id": "#root/sensor_units", + "title": "sensor_units", + "type": "object", + "description": "Optional custom units for specific sensor types", + "properties": { + "accelerometer": { + "$id": "#root/sensor_units/accelerometer", + "title": "accelerometer", + "type": "string", + "default": "g", + "description": "Unit for accelerometer data", + "examples": ["g", "m/s^2"], + "pattern": "^(.|\\s)*\\S(.|\\s)*$" + }, + "gyroscope": { + "$id": "#root/sensor_units/gyroscope", + "title": "gyroscope", + "type": "string", + "default": "d/s", + "description": "Unit for gyroscope data", + "examples": ["d/s", "rad/s"], + "pattern": "^(.|\\s)*\\S(.|\\s)*$" + }, + "magnetometer": { + "$id": "#root/sensor_units/magnetometer", + "title": "magnetometer", + "type": "string", + "default": "unspecified", + "description": "Unit for magnetometer data", + "examples": ["T", "Gauss", "unspecified"], + "pattern": "^(.|\\s)*\\S(.|\\s)*$" + }, + "analog_input": { + "$id": "#root/sensor_units/analog_input", + "title": "analog_input", + "type": "string", + "default": "unspecified", + "description": "Unit for analog input channels", + "examples": ["V", "mV", "A", "unspecified"], + "pattern": "^(.|\\s)*\\S(.|\\s)*$" + } + } + }, "times_period_multiplier": { "$id": "#root/times_period_multiplier", "title": "times_period_multiplier", @@ -945,6 +989,30 @@ ], "description": "Type of behavioral events", "pattern": "^(.|\\s)*\\S(.|\\s)*$" + }, + "comments": { + "$id": "#root/behavioral_events/items/comments", + "title": "comments", + "type": "string", + "default": "", + "description": "Optional comments about this behavioral event", + "pattern": "^(.|\\s)*$" + }, + "unit": { + "$id": "#root/behavioral_events/items/unit", + "title": "unit", + "type": "string", + "default": "unspecified", + "examples": [ + "unspecified", + "V", + "mV", + "A", + "g", + "d/s" + ], + "description": "Unit for this specific behavioral event", + "pattern": "^(.|\\s)*\\S(.|\\s)*$" } } } diff --git a/src/trodes_to_nwb/tests/test_convert_analog.py b/src/trodes_to_nwb/tests/test_convert_analog.py index 421e7c7..99a135a 100644 --- a/src/trodes_to_nwb/tests/test_convert_analog.py +++ b/src/trodes_to_nwb/tests/test_convert_analog.py @@ -3,7 +3,13 @@ import pynwb from trodes_to_nwb import convert_rec_header, convert_yaml -from trodes_to_nwb.convert_analog import add_analog_data, get_analog_channel_names +from trodes_to_nwb.convert_analog import ( + add_analog_data, + get_analog_channel_names, + _categorize_sensor_channels, + _create_sensor_timeseries, + SENSOR_TYPE_CONFIG +) from trodes_to_nwb.convert_ephys import RecFileDataChunkIterator from trodes_to_nwb.tests.test_convert_rec_header import default_test_xml_tree from trodes_to_nwb.tests.utils import data_path @@ -103,3 +109,112 @@ def test_selection_of_multiplexed_data(): ) assert data.shape[1] == expected return + + +def test_categorize_sensor_channels(): + """Test that sensor channels are correctly categorized by type""" + # Test with typical headstage channel names + test_channels = [ + "Headstage_AccelX", "Headstage_AccelY", "Headstage_AccelZ", + "Headstage_GyroX", "Headstage_GyroY", "Headstage_GyroZ", + "Headstage_MagX", "Headstage_MagY", "Headstage_MagZ", + "ECU_Ain1", "ECU_Ain2", "Controller_Ain1", + "Other_Channel" + ] + + categorized = _categorize_sensor_channels(test_channels) + + # Check accelerometer channels + assert "accelerometer" in categorized + assert sorted(categorized["accelerometer"]) == ["Headstage_AccelX", "Headstage_AccelY", "Headstage_AccelZ"] + + # Check gyroscope channels + assert "gyroscope" in categorized + assert sorted(categorized["gyroscope"]) == ["Headstage_GyroX", "Headstage_GyroY", "Headstage_GyroZ"] + + # Check magnetometer channels + assert "magnetometer" in categorized + assert sorted(categorized["magnetometer"]) == ["Headstage_MagX", "Headstage_MagY", "Headstage_MagZ"] + + # Check analog input channels + assert "analog_input" in categorized + assert sorted(categorized["analog_input"]) == ["Controller_Ain1", "ECU_Ain1", "ECU_Ain2"] + + # Check uncategorized channels + assert "other" in categorized + assert categorized["other"] == ["Other_Channel"] + + +def test_sensor_type_config(): + """Test that sensor type configuration is complete""" + required_keys = ['pattern', 'scaling_factor', 'unit', 'description'] + + for sensor_type, config in SENSOR_TYPE_CONFIG.items(): + for key in required_keys: + assert key in config, f"Missing {key} in {sensor_type} config" + + # Test that patterns compile + import re + try: + re.compile(config['pattern']) + except re.error: + assert False, f"Invalid regex pattern for {sensor_type}: {config['pattern']}" + + +def test_add_analog_data_with_metadata(): + """Test that add_analog_data creates separate TimeSeries objects in acquisition""" + # Load metadata yml and make nwb file + metadata_path = data_path / "20230622_sample_metadata.yml" + metadata, _ = convert_yaml.load_metadata(metadata_path, []) + + # Add sensor_units to metadata for testing + metadata["sensor_units"] = { + "accelerometer": "g", + "gyroscope": "d/s", + "magnetometer": "unspecified" + } + + rec_file = data_path / "20230622_sample_01_a1.rec" + rec_header = convert_rec_header.read_header(rec_file) + + # Make file with data + nwbfile = convert_yaml.initialize_nwb(metadata, rec_header) + add_analog_data(nwbfile, [rec_file], metadata=metadata) + + # Save file to test structure + filename = "test_add_analog_separated.nwb" + with pynwb.NWBHDF5IO(filename, "w") as io: + io.write(nwbfile) + + # Read back and verify structure + with pynwb.NWBHDF5IO(filename, "r", load_namespaces=True) as io: + read_nwbfile = io.read() + + # Check that we have TimeSeries in acquisition + acquisition_keys = list(read_nwbfile.acquisition.keys()) + + # We should have at least some acquisition data now (if any sensors exist) + # The exact number depends on what's in the test file + if acquisition_keys: # Only test if we actually have acquisition data + # Check that each TimeSeries has appropriate units and descriptions + for ts_name in acquisition_keys: + ts = read_nwbfile.acquisition[ts_name] + assert hasattr(ts, 'unit'), f"TimeSeries {ts_name} missing unit" + assert hasattr(ts, 'description'), f"TimeSeries {ts_name} missing description" + + # Check for expected sensor types based on name + if 'accelerometer' in ts_name.lower(): + assert ts.unit == 'g', f"Accelerometer should have unit 'g', got '{ts.unit}'" + elif 'gyroscope' in ts_name.lower(): + assert ts.unit == 'd/s', f"Gyroscope should have unit 'd/s', got '{ts.unit}'" + + # Also test that we can still use the legacy mode + nwbfile_legacy = convert_yaml.initialize_nwb(metadata, rec_header) + add_analog_data(nwbfile_legacy, [rec_file], metadata=metadata, separate_sensor_data=False) + + # Legacy mode should create processing module instead of acquisition + if hasattr(nwbfile_legacy, 'processing') and 'analog' in nwbfile_legacy.processing: + assert 'analog' in nwbfile_legacy.processing + + # Cleanup + os.remove(filename) diff --git a/test_sensor_separation.py b/test_sensor_separation.py new file mode 100644 index 0000000..be08fef --- /dev/null +++ b/test_sensor_separation.py @@ -0,0 +1,113 @@ +"""Test the new headstage sensor separation functionality""" + +import numpy as np +from trodes_to_nwb.convert_analog import _categorize_sensor_channels, _create_sensor_timeseries, SENSOR_TYPE_CONFIG + + +def test_categorize_sensor_channels(): + """Test that sensor channels are correctly categorized by type""" + # Test with typical headstage channel names + test_channels = [ + "Headstage_AccelX", "Headstage_AccelY", "Headstage_AccelZ", + "Headstage_GyroX", "Headstage_GyroY", "Headstage_GyroZ", + "Headstage_MagX", "Headstage_MagY", "Headstage_MagZ", + "ECU_Ain1", "ECU_Ain2", "Controller_Ain1", + "Other_Channel" + ] + + categorized = _categorize_sensor_channels(test_channels) + + # Check accelerometer channels + assert "accelerometer" in categorized + assert sorted(categorized["accelerometer"]) == ["Headstage_AccelX", "Headstage_AccelY", "Headstage_AccelZ"] + + # Check gyroscope channels + assert "gyroscope" in categorized + assert sorted(categorized["gyroscope"]) == ["Headstage_GyroX", "Headstage_GyroY", "Headstage_GyroZ"] + + # Check magnetometer channels + assert "magnetometer" in categorized + assert sorted(categorized["magnetometer"]) == ["Headstage_MagX", "Headstage_MagY", "Headstage_MagZ"] + + # Check analog input channels + assert "analog_input" in categorized + assert sorted(categorized["analog_input"]) == ["Controller_Ain1", "ECU_Ain1", "ECU_Ain2"] + + # Check uncategorized channels + assert "other" in categorized + assert categorized["other"] == ["Other_Channel"] + + print("✓ Channel categorization test passed") + + +def test_create_sensor_timeseries(): + """Test TimeSeries creation with proper scaling and units""" + # Test accelerometer scaling + test_data = np.array([[1000, 2000], [1500, 2500]], dtype=np.int16) # 2 timepoints, 2 channels + test_timestamps = np.array([0.0, 0.1]) + + # Test accelerometer + accel_ts = _create_sensor_timeseries( + sensor_type="accelerometer", + channel_names=["Headstage_AccelX", "Headstage_AccelY"], + data=test_data, + timestamps=test_timestamps + ) + + # Check scaling is applied (should be original * 0.000061) + expected_data = test_data * 0.000061 + assert np.allclose(accel_ts.data[:], expected_data) + assert accel_ts.unit == "g" + assert "accelerometer" in accel_ts.description.lower() + assert "Headstage_AccelX" in accel_ts.description + + # Test gyroscope + gyro_ts = _create_sensor_timeseries( + sensor_type="gyroscope", + channel_names=["Headstage_GyroX"], + data=test_data[:, [0]], # Single channel + timestamps=test_timestamps + ) + + # Check scaling is applied (should be original * 0.061) + expected_gyro_data = test_data[:, [0]] * 0.061 + assert np.allclose(gyro_ts.data[:], expected_gyro_data) + assert gyro_ts.unit == "d/s" + + # Test with custom metadata units + metadata = {"sensor_units": {"accelerometer": "custom_g"}} + custom_ts = _create_sensor_timeseries( + sensor_type="accelerometer", + channel_names=["Headstage_AccelX"], + data=test_data[:, [0]], + timestamps=test_timestamps, + metadata=metadata + ) + assert custom_ts.unit == "custom_g" + + print("✓ TimeSeries creation test passed") + + +def test_sensor_type_config(): + """Test that sensor type configuration is complete""" + required_keys = ['pattern', 'scaling_factor', 'unit', 'description'] + + for sensor_type, config in SENSOR_TYPE_CONFIG.items(): + for key in required_keys: + assert key in config, f"Missing {key} in {sensor_type} config" + + # Test that patterns compile + import re + try: + re.compile(config['pattern']) + except re.error: + assert False, f"Invalid regex pattern for {sensor_type}: {config['pattern']}" + + print("✓ Sensor type configuration test passed") + + +if __name__ == "__main__": + test_categorize_sensor_channels() + test_create_sensor_timeseries() + test_sensor_type_config() + print("\n✓ All tests passed!") \ No newline at end of file