Skip to content

Conversation

@edeno
Copy link
Collaborator

@edeno edeno commented Apr 11, 2025

Description

Like the spikesorting.v0.spikesorting_populator, this attempts to wrap pipelines with many tables into one function for ease of use. I try to utilize any multiprocessing for parallel processing of datasets.

Also deleted some pyscripts that were renamed.

Still need to test this.

Fixes #891
Fixes #529

Checklist:

  • This PR should be accompanied by a release: (yes/no/unsure)
  • If release, I have updated the CITATION.cff
  • This PR makes edits to table definitions: (yes/no)
  • If table edits, I have included an alter snippet for release notes.
  • If this PR makes changes to position, I ran the relevant tests locally.
  • I have updated the CHANGELOG.md with PR number and description.
  • I have added/edited docs/notebooks to reflect the changes

@edeno edeno requested a review from Copilot April 11, 2025 23:55
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 16 out of 16 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (1)

src/spyglass/linearization/v1/pipeline.py:92

  • Ensure that the custom method 'fetch_nwb_file_name()' returns a list of dicts containing the 'nwb_file_name' key. If the return format differs, update the subsequent code to correctly extract the NWB file name.
pos_entry = (PositionOutput & pos_key).fetch_nwb_file_name()

@edeno edeno requested a review from Copilot April 14, 2025 17:44
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 16 out of 16 changed files in this pull request and generated no comments.

@edeno edeno requested a review from Copilot May 1, 2025 16:05
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces new high‐level populator functions to simplify the execution of multiple processing pipelines (position, DLC training/setup, MUA, linearization, decoding, and behavior) while removing some renamed or obsolete pyscripts/notebooks. The changes include updates to function signatures and docstrings for clarity, as well as the removal of unused imports and outdated scripts.

Reviewed Changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated no comments.

Show a summary per file
File Description
src/spyglass/spikesorting/v1/recording.py Updated the insert_selection method signature with an explicit return type and corrected a minor typo in the comments.
src/spyglass/spikesorting/spikesorting_merge.py Removed an unused import.
src/spyglass/position/v1/pipeline_trodes.py Added a populator function for the Trodes Position V1 pipeline.
src/spyglass/position/v1/pipeline_dlc_training.py Added a new function to run DLC Model Training V1.
src/spyglass/position/v1/pipeline_dlc_setup.py Introduced a new populator function for setting up DLC projects with frame extraction.
src/spyglass/mua/v1/pipeline.py Added a populator function for the Multi-Unit Activity (MUA) V1 pipeline.
src/spyglass/linearization/v1/pipeline.py Added a new populator function for the Position Linearization V1 pipeline.
src/spyglass/decoding/v1/pipeline_waveform_feature_extraction.py Added a pipeline for Waveform Feature Extraction V1.
src/spyglass/decoding/v1/pipeline_sorted.py Added a pipeline for Sorted Spikes Decoding V1.
src/spyglass/decoding/v1/pipeline_clusterless.py Added a pipeline for Clusterless Decoding V1.
src/spyglass/behavior/v1/pipeline.py Added a new populator function for the MoSeq Behavior V1 pipeline.
notebooks/py_scripts/51_MUA_Detection.py, 43_Decoding_SortedSpikes.py, 41_Extracting_Clusterless_Waveform_Features.py Deprecated/not needed notebooks have been removed.

@edeno edeno added this to the 0.5.6 milestone Aug 6, 2025
Copy link
Member

@CBroz1 CBroz1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we think about populators, it might be worth drawing inspiration from this pipeline. Specifically, they have ...

An orchestrator would be scope creep on the current PR, but it's worth looking at what they did and maybe tipping the design in favor of something that would be easy to edit with that future in mind

# --- Main Populator Function ---


def populate_spyglass_linearization_v1(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking this as an example of a more straightforward populator, I'll leave some general comments here...

I think it would be helpful to see the goal notebook use case as part of this PR, or does the example section below reflect the extent of the potential notebook? How much hand-holding will the eventual notebook do to to avoid errors. Will it check for parent keys? Insert paramsets? Or does a populator, in general, require use of existing paramsets/keys otherwise inserted?

if not (PositionOutput & pos_key):
raise ValueError(f"PositionOutput entry not found: {pos_merge_id}")
# Need nwb_file_name from position source to check track graph and interval
pos_entry = (PositionOutput & pos_key).fetch("nwb_file_name")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I was trying the wrong key, but this fetch didn't work for me, as nwb_file_name isn't in the table header. I don't think this works with merge_fetch either


final_key = None

try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to keep my try blocks to just one line, so I know exactly what I'm catching with the except. If we'd like to avoid the redundancy of repeated 'excepts', i.e.,

try: 
    do_first()
except A: 
     print("A")
except B:
    print("B")
    
try: 
    do_second()
except A: 
     print("A")
except B:
    print("B")

I think that's a good candidate for a util.

def populator_exceptions(func, *args, description):
    try:
        func(args)
    except DataJointError as e:
        logger.error(f"DataJoint error for {description}: {e}")
    except Exception as e
        logger.error(f"General error for {description}: {e}")

Comment on lines 145 to 156
if not (LinearizationSelection & selection_key):
LinearizationSelection.insert1(
selection_key, skip_duplicates=skip_duplicates
)
else:
logger.warning(
f"Linearization Selection already exists for {pipeline_description}"
)
if not skip_duplicates:
raise dj.errors.DataJointError(
"Duplicate selection entry exists."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One feature of skip_duplicates is that it will update secondary keys. Not here, but maybe for other populators, only running the insert without a match risks skipping that update

It seems like you could eliminate the raise here by just letting insert do it for you. Do we think the user wants this 'key already exists' warning when they specified skip_duplicates? I'd guess that they don't care

Maybe the goal here is to avoid issues when the user is rerunning the populator after a partial fail? If so, I might separate that out into a flag that doesn't map on to the DJ built-in

Suggested change
if not (LinearizationSelection & selection_key):
LinearizationSelection.insert1(
selection_key, skip_duplicates=skip_duplicates
)
else:
logger.warning(
f"Linearization Selection already exists for {pipeline_description}"
)
if not skip_duplicates:
raise dj.errors.DataJointError(
"Duplicate selection entry exists."
)
if skip_existing and (LinearizationSelection & selection_key):
logger.info(
f"Linearization Selection already exists for {pipeline_description}"
)
else:
LinearizationSelection.insert1(
selection_key, skip_duplicates=skip_duplicates
)

f"Linearization Selection already exists for {pipeline_description}"
)
if not skip_duplicates:
raise dj.errors.DataJointError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ignoring my other comment, this is still worth specifying

Suggested change
raise dj.errors.DataJointError(
raise dj.errors.DuplicateError(

)

# Ensure selection exists before populating
if not (LinearizationSelection & selection_key):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a repeated pattern, I could see a mixin method for insert_and_verify or insert_if_not_exists

logger.warning(
f"Final linearized position {final_key} already in merge table for {pipeline_description}."
)
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would invert this if/else so the thing with fewer lines happens first

Copy link
Member

@CBroz1 CBroz1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at a few other populators to comment on any patterns I didn't see in linearization, but I didn't go through position or spikesorting. Happy to take another look if you'd like

Comment on lines +149 to +153
for merge_id in target_pose_merge_ids:
if not (PositionOutput & {"merge_id": merge_id}):
raise ValueError(
f"PositionOutput merge_id '{merge_id}' not found."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally write these errors as a subset check

all_ids = SomeTable().fetch('id')
is_subset = set(my_ids).issubset(all_ids)
if not is_subset:
    raise dj.errors.IntegrityError(f"Subset of ids missing from table: {my_ids}")

It saves the for-loop of repeated checks, at the cost if a less informative error. Alternatively, for primary key checks on potentially large tables...

restr_tbl = SomeTable & f"id in ({tuple(my_ids)})"
if len(restr_tbl) < len(my_ids):
    raise 


model_selection_key = {**pose_group_key, **model_params_key}

if extract_syllables:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would extract these sections out into helper functions. To be able to reuse pieces like model_selection_key across subsections, I throw it in a class and set these in class attributes. An ambitious approach to this process might set up a meta-class that knows how to handle a data structure like a dict

my_pipeline = dict(
    description="Some description"
    steps=[
        dict(table=ParamTable, type='params', insert=some_key, validate_func=some_func),
        dict(table=SelectionTbl, type='selection', insert=some_key, dependencies=some_list_of_tbls),
        dict(table=DataTbl, type='populate', insert=some_key)
    ]
)

populator = PipelinePopulator(pipeline=my_pipeline)
populator.run()

The more straight-forward single-modality class could have an init that handles the conditional subsections

class MoseqPopulator(SpyglassPopulator):
    def __init___(self, *args, **kwargs):
        self.args = args
        self.config = kwargs

        if self.config.get('first_step'):
            self.first_step()
        if self.config.get('second_step'):
            self.second_step()

Comment on lines +182 to +186
pipeline_description = (
f"{nwb_file_name} | WFs {waveform_features_group_name} |"
f" Pos {position_group_name} | Decode Interval {current_decoding_interval} |"
f" Params {decoding_param_name}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For things that update the user on the status of a process, I tend to set a line length limit ... mostly for my own debugging in tmux. I think some kind of indented formatting might better draw the eye, but totally a matter of personal preference

Suggested change
pipeline_description = (
f"{nwb_file_name} | WFs {waveform_features_group_name} |"
f" Pos {position_group_name} | Decode Interval {current_decoding_interval} |"
f" Params {decoding_param_name}"
)
pipeline_description = (
f"Clusterless Decoding {nwb_file_name}"
f"\n\tWFs {waveform_features_group_name}"
f"\n\tPos {position_group_name}"
f"\n\tDecode Interval {current_decoding_interval}"
f"\n\tParams {decoding_param_name}"
)

# --- Input Validation ---
if not (Nwbfile & {"nwb_file_name": nwb_file_name}):
raise ValueError(f"Nwbfile not found: {nwb_file_name}")
wf_group_key = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While relatively simple here, I do see other cases where this gets complicated. I might expect the user to pass this info as a dictionary. If that's the case, it'll make internal handling easier, and it's easier to set up the error that says, "your x key wasn't valid for table y. try running 'x & y' to check"

"decoding_interval": current_decoding_interval,
}

final_key = None # Reset final key for each interval
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cases where loop iteration requires the reset of variables, it makes sense to me to convert to a helper func with explicit args/returns

nwb_file_name = (
SpikeSortingOutput.merge_get_parent(ss_key) & ss_key
).fetch1("nwb_file_name")
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All exceptions here? I would try to narrow down to the expected case

raise ValueError(
f"IntervalList not found: {nwb_file_name}, {interval_list_name}"
)
elif interval_list_name and not nwb_file_name:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't interval_list_name a primary key of IntervalList? if they only passed the interval name, you can use that to retrieve the nwb_file_name

# --- Helper Function for Parallel Processing ---


def _process_single_lfp_band(args_tuple: Tuple) -> Optional[Tuple]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does NonDaemonPool require a tuple as opposed to just pulling items from kwargs?

Comment on lines +264 to +269
if not (Nwbfile & {"nwb_file_name": nwb_file_name}):
raise ValueError(f"Nwbfile not found: {nwb_file_name}")
if not (
LFPElectrodeGroup
& {
"nwb_file_name": nwb_file_name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it easier to read the partial keys

Suggested change
if not (Nwbfile & {"nwb_file_name": nwb_file_name}):
raise ValueError(f"Nwbfile not found: {nwb_file_name}")
if not (
LFPElectrodeGroup
& {
"nwb_file_name": nwb_file_name,
nwb_dict = {"nwb_file_name": nwb_file_name}
if not (Nwbfile & nwb_dict):
raise ValueError(f"Nwbfile not found: {nwb_file_name}")
if not (
LFPElectrodeGroup
& {
**nwb_dict,

)
try:
with NonDaemonPool(processes=max_processes) as pool:
band_keys = list(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This level of embedding is tough for me to follow the logic of

@edeno edeno changed the title Add populator functions for ease of use WIP: Add populator functions for ease of use Aug 8, 2025
@codecov
Copy link

codecov bot commented Aug 8, 2025

Codecov Report

❌ Patch coverage is 0.10363% with 964 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.88%. Comparing base (dcff212) to head (2b7262e).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
src/spyglass/spikesorting/v1/pipeline.py 0.00% 235 Missing ⚠️
src/spyglass/position/v1/pipeline_dlc_inference.py 0.00% 208 Missing ⚠️
src/spyglass/lfp/v1/pipeline.py 0.00% 118 Missing ⚠️
src/spyglass/decoding/v1/pipeline_sorted.py 0.00% 75 Missing ⚠️
src/spyglass/decoding/v1/pipeline_clusterless.py 0.00% 74 Missing ⚠️
...ecoding/v1/pipeline_waveform_feature_extraction.py 0.00% 63 Missing ⚠️
src/spyglass/position/v1/pipeline_dlc_training.py 0.00% 52 Missing ⚠️
src/spyglass/position/v1/pipeline_trodes.py 0.00% 45 Missing ⚠️
src/spyglass/linearization/v1/pipeline.py 0.00% 38 Missing ⚠️
src/spyglass/position/v1/pipeline_dlc_setup.py 0.00% 36 Missing ⚠️
... and 2 more

❌ Your project check has failed because the head coverage (64.88%) is below the target coverage (68.00%). You can increase the head coverage or adjust the target coverage.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1279      +/-   ##
==========================================
- Coverage   69.81%   64.88%   -4.93%     
==========================================
  Files         104      115      +11     
  Lines       12733    13689     +956     
==========================================
- Hits         8889     8882       -7     
- Misses       3844     4807     +963     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add pipeline populators A more pythonic user interface

3 participants