Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ded2544
fallback to chunked entry insert in _log_fetch
samuelbray32 Aug 27, 2025
2d57e64
fallback to chunked entry insert in _log_fetch
samuelbray32 Aug 27, 2025
02fb80c
cleanup code
samuelbray32 Aug 27, 2025
f34a079
cleanup and performance fix
samuelbray32 Aug 28, 2025
eab1f33
Apply suggestion from @CBroz1
samuelbray32 Aug 28, 2025
c809db7
fix restr_str in _log_fetch_nwb
samuelbray32 Sep 5, 2025
342bc97
log proper restrictions for projected table
samuelbray32 Sep 8, 2025
346f17a
handle case where no entries and key recording
samuelbray32 Sep 8, 2025
e9d0b34
update tests
samuelbray32 Sep 8, 2025
e9051a5
methods for graph intersection
samuelbray32 Sep 10, 2025
8d2929b
add nwb list restriction to export
samuelbray32 Sep 11, 2025
0363267
clear export cache in fixture to ensure logged in each analysis
samuelbray32 Sep 12, 2025
f6fe1d0
test results of compound restriction logging
samuelbray32 Sep 12, 2025
4dad0c2
spelling and reduce log calls
samuelbray32 Sep 12, 2025
cb03db4
Merge branch 'master' into export_barriers
samuelbray32 Sep 12, 2025
8bf01ff
multiprocessing for linked file scanning
samuelbray32 Sep 12, 2025
4f9dc92
add test for export nwb file intersection
samuelbray32 Sep 12, 2025
78f3af5
make chunking of restriction key entries recursive to prevent error
samuelbray32 Sep 12, 2025
3fef2b0
condense selection table restrictions prior to restriction graph
samuelbray32 Sep 12, 2025
41d6177
allow intersection of un-cascaded self graph
samuelbray32 Sep 15, 2025
3b958e6
efficiency improvements
samuelbray32 Sep 15, 2025
e58f2bf
enforce string restrictions on intersect results
samuelbray32 Sep 16, 2025
e6fca26
utility function for editing existing hdmf dataset type
samuelbray32 Sep 18, 2025
9947282
add parallelization
samuelbray32 Sep 19, 2025
ac8ce65
add parallelization
samuelbray32 Sep 19, 2025
11d7349
fix pandas table id issue in dandi updater
samuelbray32 Sep 23, 2025
8dfe2c6
remove franklab specific note from RawPosition
samuelbray32 Sep 24, 2025
126a028
suggestions from review
samuelbray32 Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/spyglass/utils/dj_merge_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,9 @@ def fetch_nwb(
[
(
self
& self._merge_restrict_parts(file)
& source_restr
& dj.AndList(
[self._merge_restrict_parts(file), source_restr]
)
).fetch1(self._reserved_pk)
for file in nwb_list
]
Expand Down
55 changes: 39 additions & 16 deletions src/spyglass/utils/mixins/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def _called_funcs(self):
ret = {i.function for i in inspect_stack()} - ignore
return ret

def _log_fetch(self, restriction=None, *args, **kwargs):
def _log_fetch(self, restriction=None, chunk_size=None, *args, **kwargs):
"""Log fetch for export."""
if (
not self.export_id
Expand Down Expand Up @@ -203,20 +203,47 @@ def _log_fetch(self, restriction=None, *args, **kwargs):
if restr_str is True:
restr_str = "True" # otherwise stored in table as '1'

if isinstance(restr_str, str) and "SELECT" in restr_str:
raise RuntimeError(
"Export cannot handle subquery restrictions. Please submit a "
+ "bug report on GitHub with the code you ran and this"
+ f"restriction:\n\t{restr_str}"
if not (
(
isinstance(restr_str, str)
and (len(restr_str) > 2048)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still possible we'll bump this 2048 in the future. If so, it would be helpful to have a comment under ExportSelection.Table saying this was hard-coded here

It's also possible to fetch the value with self._export_table.Table.heading.attributes['restriction'].type[8:-1], but I would want to save that as a cached_property like self._export_restr_limit in that case to prevent that kind of table definition fetch on each iteration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll also add in a raised error in _insert_log if too long a restriction makes it to there.

or "SELECT" in restr_str
)
):
self._insert_log(restr_str)
return

if isinstance(restr_str, str) and len(restr_str) > 2048:
raise RuntimeError(
"Export cannot handle restrictions > 2048.\n\t"
+ "If required, please open an issue on GitHub.\n\t"
+ f"Restriction: {restr_str}"
if "SELECT" in restr_str:
logger.debug(
"Restriction contains subquery. Exporting entry restrictions instead"
)

# handle excessive restrictions caused by long OR list of dicts
logger.debug(
f"Restriction too long ({len(restr_str)} > 2048)."
+ "Attempting to chunk restriction by subsets of entry keys."
)
# get list of entry keys
restricted_table = (
self.restrict(restriction, log_export=False)
if restriction
else self
Comment on lines +279 to +280
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have neither a passed restriction nor a self.restriction attr, we might save time by setting the restriction to True and returning early

)
restricted_entries = restricted_table.fetch("KEY", log_export=False)
Copy link
Member

@CBroz1 CBroz1 Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to state the logic: If a restriction exceeds our character limit, we will fetch the restricted table and recompile the restriction from chunks of entries. Yeah?

I wanted to confirm that resulting entries were treated as cumulative, so I dug through the export logic -> RestrGraph logic -> _set_restr method ... and yes, _set_restr by default merges passed entries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The new entries should be equivalent to those made by accessing the chunks of file entries in a sequential manner.

It loses information about what the restriction in the code was, but the entries specified should be the same

if chunk_size is None:
# estimate appropriate chunk size
chunk_size = max(
int(2048 // (len(restr_str) / len(restricted_entries))), 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kinds of speed gains would we see from increasing the varchar? I would imagine the merging later takes more time than fetching the longer varchar.

Empirical question, likely not worth the deep dive required to answer

)
for i in range(len(restricted_entries) // chunk_size + 1):
chunk_entries = restricted_entries[
i * chunk_size : (i + 1) * chunk_size
]
chunk_restr_str = make_condition(self, chunk_entries, set())
self._insert_log(chunk_restr_str)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's probably a way to make this recursive to avoid ever hitting the varchar limit

def log_fetch(self, restr, ...):
    if len(restr) < self._export_restr_limit:
        self._insert_log(restr)
        return
    all_entries = (self & restr).fetch('KEY', log_export=False)
    for chunk_entries in all_entries[ ... chunk slicing ... ]:
        _ = (self & chunk_entries).fetch("KEY", log_export=True)

This way, if chunking fails for whatever reason, it'll get re-chunked. The embedded case will be smaller than we want, probably slower, but that seems preferable to hitting this error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable. I'll try something like that It's also still technically possible to hit the limit if the primary key restriction for a single entry is >2048

return

def _insert_log(self, restr_str):
if isinstance(restr_str, str):
restr_str = bash_escape_sql(restr_str, add_newline=False)

Expand Down Expand Up @@ -292,10 +319,6 @@ def _run_with_log(self, method, *args, log_export=True, **kwargs):
self._run_join(**kwargs)
else:
restr = kwargs.get("restriction")
if isinstance(restr, QueryExpression) and getattr(
restr, "restriction" # if table, try to get restriction
):
restr = restr.restriction
self._log_fetch(restriction=restr)
logger.debug(f"Export: {self._called_funcs()}")

Expand Down Expand Up @@ -323,7 +346,7 @@ def fetch1(self, *args, log_export=True, **kwargs):
super().fetch1, *args, log_export=log_export, **kwargs
)

def restrict(self, restriction):
def restrict(self, restriction, chunk_size=10):
"""Log restrict for export."""
if not self.export_id:
return super().restrict(restriction)
Expand Down
Loading