Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ded2544
fallback to chunked entry insert in _log_fetch
samuelbray32 Aug 27, 2025
2d57e64
fallback to chunked entry insert in _log_fetch
samuelbray32 Aug 27, 2025
02fb80c
cleanup code
samuelbray32 Aug 27, 2025
f34a079
cleanup and performance fix
samuelbray32 Aug 28, 2025
eab1f33
Apply suggestion from @CBroz1
samuelbray32 Aug 28, 2025
c809db7
fix restr_str in _log_fetch_nwb
samuelbray32 Sep 5, 2025
342bc97
log proper restrictions for projected table
samuelbray32 Sep 8, 2025
346f17a
handle case where no entries and key recording
samuelbray32 Sep 8, 2025
e9d0b34
update tests
samuelbray32 Sep 8, 2025
e9051a5
methods for graph intersection
samuelbray32 Sep 10, 2025
8d2929b
add nwb list restriction to export
samuelbray32 Sep 11, 2025
0363267
clear export cache in fixture to ensure logged in each analysis
samuelbray32 Sep 12, 2025
f6fe1d0
test results of compound restriction logging
samuelbray32 Sep 12, 2025
4dad0c2
spelling and reduce log calls
samuelbray32 Sep 12, 2025
cb03db4
Merge branch 'master' into export_barriers
samuelbray32 Sep 12, 2025
8bf01ff
multiprocessing for linked file scanning
samuelbray32 Sep 12, 2025
4f9dc92
add test for export nwb file intersection
samuelbray32 Sep 12, 2025
78f3af5
make chunking of restriction key entries recursive to prevent error
samuelbray32 Sep 12, 2025
3fef2b0
condense selection table restrictions prior to restriction graph
samuelbray32 Sep 12, 2025
41d6177
allow intersection of un-cascaded self graph
samuelbray32 Sep 15, 2025
3b958e6
efficiency improvements
samuelbray32 Sep 15, 2025
e58f2bf
enforce string restrictions on intersect results
samuelbray32 Sep 16, 2025
e6fca26
utility function for editing existing hdmf dataset type
samuelbray32 Sep 18, 2025
9947282
add parallelization
samuelbray32 Sep 19, 2025
ac8ce65
add parallelization
samuelbray32 Sep 19, 2025
11d7349
fix pandas table id issue in dandi updater
samuelbray32 Sep 23, 2025
8dfe2c6
remove franklab specific note from RawPosition
samuelbray32 Sep 24, 2025
126a028
suggestions from review
samuelbray32 Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 162 additions & 23 deletions src/spyglass/common/common_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
plan future development of Spyglass.
"""

from multiprocessing import Pool, cpu_count
from typing import List, Union

import datajoint as dj
from datajoint.condition import make_condition
from pynwb import NWBHDF5IO
from tqdm import tqdm

from spyglass.common.common_nwbfile import AnalysisNwbfile, Nwbfile
from spyglass.settings import test_mode
Expand All @@ -24,6 +27,9 @@
from spyglass.utils.sql_helper_fn import SQLDumpHelper

schema = dj.schema("common_usage")
INCLUDED_NWB_FILES = (
None # global variable to temporarily hold included nwb files
)


@schema
Expand Down Expand Up @@ -203,7 +209,11 @@ def _externals(self) -> dj.external.ExternalMapping:
return dj.external.ExternalMapping(schema=AnalysisNwbfile)

def _add_externals_to_restr_graph(
self, restr_graph: RestrGraph, key: dict
self,
restr_graph: RestrGraph,
key: dict,
raw_files=None,
analysis_files=None,
) -> RestrGraph:
"""Add external tables to a RestrGraph for a given restriction/key.

Expand All @@ -221,21 +231,32 @@ def _add_externals_to_restr_graph(
A RestrGraph object to add external tables to.
key : dict
Any valid restriction key for ExportSelection.Table
raw_files : list, optional
A list of raw nwb file names to add. Default None, which retrieves
from ExportSelection._list_raw_files.
analysis_files : list, optional
A list of analysis nwb file names to add. Default None, which retrieves
from ExportSelection._list_analysis_files.

Returns
-------
restr_graph : RestrGraph
The updated RestrGraph
"""
if raw_files is None:
raw_files = self._list_raw_files(key)
if analysis_files is None:
analysis_files = self._list_analysis_files(key)

# only add items if found respective file types
if raw_files := self._list_raw_files(key):
if raw_files:
raw_tbl = self._externals["raw"]
raw_name = raw_tbl.full_table_name
raw_restr = "filepath in ('" + "','".join(raw_files) + "')"
restr_graph.graph.add_node(raw_name, ft=raw_tbl, restr=raw_restr)
restr_graph.visited.add(raw_name)

if analysis_files := self._list_analysis_files(key):
if analysis_files:
analysis_tbl = self._externals["analysis"]
analysis_name = analysis_tbl.full_table_name
# to avoid issues with analysis subdir, we use REGEXP
Expand All @@ -251,10 +272,13 @@ def _add_externals_to_restr_graph(
return restr_graph

def get_restr_graph(
self, key: dict, verbose=False, cascade=True
self, key: dict, verbose=False, cascade=True, included_nwb_files=None
) -> RestrGraph:
"""Return a RestrGraph for a restriction/key's tables/restrictions.

Restriction graph limits to entries stemming from the raw nwb_files
listed in included_nwb_files, if provided.

Ignores duplicate entries.

Parameters
Expand All @@ -265,12 +289,25 @@ def get_restr_graph(
Turn on RestrGraph verbosity. Default False.
cascade : bool, optional
Propagate restrictions to upstream tables. Default True.
included_nwb_files : list, optional
A whitelist of nwb files to include in the export. Default None applies
no whitelist restriction.
"""
leaves = unique_dicts(
(self * self.Table & key).fetch(
"table_name", "restriction", as_dict=True
selection_tables = self * self.Table & key
tracked_tables = set(selection_tables.fetch("table_name"))
leaves = []
# Condense to single restriction per table (OR of all restrictions).
# Large performance boost for large exports with many logged entries
for table_name in tracked_tables:
restr_list = (selection_tables & dict(table_name=table_name)).fetch(
"restriction"
)
restriction = make_condition(
dj.FreeTable(dj.conn(), table_name), restr_list, set()
)
leaves.append(
{"table_name": table_name, "restriction": restriction}
)
)

restr_graph = RestrGraph(
seed_table=self,
Expand All @@ -279,7 +316,55 @@ def get_restr_graph(
cascade=False,
include_files=True,
)
restr_graph = self._add_externals_to_restr_graph(restr_graph, key)

if included_nwb_files is None:
restr_graph = self._add_externals_to_restr_graph(restr_graph, key)
if cascade:
restr_graph.cascade()
return restr_graph

# Restrict the graph to only include entries stemming from the
# included nwb files
logger.info("Generating restriction graph of included nwb files")
nwb_restr = make_condition(
Nwbfile(),
[f"nwb_file_name = '{f}'" for f in included_nwb_files],
set(),
)
whitelist_graph = RestrGraph(
seed_table=Nwbfile,
leaves={
"table_name": Nwbfile.full_table_name,
"restriction": nwb_restr,
},
verbose=verbose,
cascade=True,
include_files=True,
direction="down",
)
logger.info("Intersecting with export restriction graph")
restr_graph = restr_graph & whitelist_graph
raw_files_to_add = [
f
for f in ExportSelection()._list_raw_files(key)
if f in included_nwb_files
]
analysis_files_to_add = [
f
for f in ExportSelection()._list_analysis_files(key)
if any(
[
nwb_file_name.split("_.nwb")[0] in f
for nwb_file_name in included_nwb_files
]
)
]
restr_graph = self._add_externals_to_restr_graph(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My REGEX syntax in the _add_externals method is something I included thinking this method would only be run once. It can be slow with a long list. If you didn't encounter that in practice, no worries - but if things seem slow, that could be something to poke at

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At minimum, it's not a rate limiting point in the export I ran, but good to keep in mind

restr_graph,
key,
raw_files=raw_files_to_add,
analysis_files=analysis_files_to_add,
)

if cascade:
restr_graph.cascade()
Expand Down Expand Up @@ -326,6 +411,7 @@ class Export(SpyglassMixin, dj.Computed):
-> ExportSelection
---
paper_id: varchar(32)
included_nwb_file_names = null: mediumblob # list of nwb files included in export
"""

# In order to get a many-to-one relationship btwn Selection and Export,
Expand All @@ -350,18 +436,40 @@ class File(SpyglassMixin, dj.Part):
file_path: varchar(255)
"""

def populate_paper(self, paper_id: Union[str, dict]):
def populate_paper(
self,
paper_id: Union[str, dict],
included_nwb_files=None,
n_processes=1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add mention of new args in docstring. I think anything with more than one arg needs the Parameters section.

You're adding this new feature, and then disabling it by default?

Suggested change
n_processes=1,
n_processes=-1,

):
"""Populate Export for a given paper_id."""
self.load_shared_schemas()
if isinstance(paper_id, dict):
paper_id = paper_id.get("paper_id")
self.populate(ExportSelection().paper_export_id(paper_id))
global INCLUDED_NWB_FILES
INCLUDED_NWB_FILES = included_nwb_files # store in global variable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to avoid the global if we can. Maybe we can assign it as a cache on this table?

self._nwb_whitelist_cache[paper_id] = included_nwb_files 
...
if self._nwb_whitelist_cache.get(paper_id, None): 
    ...

global N_PROCESSES
if n_processes < 1:
n_processes = 1
elif n_processes > cpu_count():
n_processes = cpu_count()
N_PROCESSES = n_processes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beyond general preference for avoiding globals, I'm thinking this could be a more specific variable name to indicate what the processes are for

self._n_filelink_processes?

self.populate(
{
**ExportSelection().paper_export_id(paper_id),
}
)

def make(self, key):
"""Populate Export table with the latest export for a given paper."""
logger.info(f"Populating Export for {key}")
paper_key = (ExportSelection & key).fetch("paper_id", as_dict=True)[0]
query = ExportSelection & paper_key

included_nwb_files = INCLUDED_NWB_FILES
# included_nwb_files = INCLUDED_NWB_FILES.copy()
# INCLUDED_NWB_FILES = None # reset global variable

# Null insertion if export_id is not the maximum for the paper
all_export_ids = ExportSelection()._max_export_id(paper_key, True)
max_export_id = max(all_export_ids)
Expand All @@ -384,27 +492,47 @@ def make(self, key):
(self.Table & id_dict).delete_quick()
(self.Table & id_dict).delete_quick()

restr_graph = ExportSelection().get_restr_graph(paper_key)
logger.info(f"Generating export_id {key['export_id']}")
restr_graph = ExportSelection().get_restr_graph(
paper_key, included_nwb_files=included_nwb_files, verbose=True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the average user doesn't want to monitor this. Maybe yolk it to settings.debug_mode?

Suggested change
paper_key, included_nwb_files=included_nwb_files, verbose=True
paper_key, included_nwb_files=included_nwb_files, verbose=False

)
# Original plus upstream files
file_paths = {
*query.list_file_paths(paper_key, as_dict=False),
*restr_graph.file_paths,
}
if included_nwb_files:
# Limit to derivatives of the included nwb files
file_paths = {
f
for f in file_paths
if any(
[
nwb_file_name.split("_.nwb")[0] in f
for nwb_file_name in included_nwb_files
]
)
}

# Check for linked nwb objects and add them to the export
unlinked_files = set()
for file in file_paths:
if not (links := get_linked_nwbs(file)):
unlinked_files.add(file)
continue
logger.warning(
"Dandi not yet supported for linked nwb objects "
+ f"excluding {file} from export "
+ f" and including {links} instead"
)
unlinked_files.update(links)
if N_PROCESSES == 1:
for file in tqdm(file_paths, desc="Checking linked nwb files"):
unlinked_files.update(get_unlinked_files(file))
else:
with Pool(processes=N_PROCESSES) as pool:
results = list(
tqdm(
pool.map(get_unlinked_files, file_paths),
total=len(file_paths),
desc="Checking linked nwb files",
)
)
for files in results:
unlinked_files.update(files)
file_paths = unlinked_files

restr_graph.enforce_restr_strings() # ensure all restr are strings

table_inserts = [
{**key, **rd, "table_id": i}
for i, rd in enumerate(restr_graph.as_dict)
Expand Down Expand Up @@ -461,3 +589,14 @@ def _make_fileset_ids_unique(self, key):
else:
new_id = make_file_obj_id_unique(file_path)
unique_object_ids.append(new_id)


def get_unlinked_files(file_path):
if not (links := get_linked_nwbs(file_path)):
return {file_path}
logger.warning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this warning show multiple times? Repeated warnings can have the opposite effect of having the user ignore the statement alltogether. Flaw in the original design, but something that only occurred to me with the multithreading

"Dandi not yet supported for linked nwb objects "
+ f"excluding {file_path} from export "
+ f" and including {links} instead"
)
return set(links)
Loading
Loading