Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bids2table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"parse_bids_entities",
"validate_bids_entities",
"set_bids_schema",
"clear_schema_caches",
"get_bids_schema",
"get_bids_entity_arrow_schema",
"format_bids_path",
Expand All @@ -26,6 +27,7 @@
)
from ._indexing import (
batch_index_dataset,
clear_schema_caches,
find_bids_datasets,
get_arrow_schema,
get_column_names,
Expand Down
34 changes: 32 additions & 2 deletions bids2table/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,17 @@ def main():
nargs="+",
default=None,
help="List of subject names or glob patterns to only include in the index. "
"Only applies when indexing a single dataset.",
"Deprecated; use --filter instead.",
)
parser_index.add_argument(
"--filter",
"-f",
metavar="ENTITY=PATTERN",
type=str,
action="append",
default=None,
help="Filter indexed files by entity value (e.g. -f sub=sub-0* -f task=rest). "
"Can be specified multiple times.",
)
parser_index.add_argument(
"--workers",
Expand Down Expand Up @@ -114,10 +124,29 @@ def _index_command(args: argparse.Namespace):
else:
root.append(path)

# Build filters dict from --filter args
filters: dict[str, str | list[str]] | None = None
if args.filter:
filters = {}
for item in args.filter:
parts = item.split("=", maxsplit=1)
if len(parts) == 2:
filters[parts[0]] = parts[1]
else:
_logger.warning(
f"Ignoring malformed filter: '{item}' (expected ENTITY=PATTERN)"
)

# Merge deprecated --subjects into filters
if args.subjects:
if filters is None:
filters = {}
filters.setdefault("sub", args.subjects)

if len(root) == 1:
table = b2t2.index_dataset(
root[0],
include_subjects=args.subjects,
filters=filters,
show_progress=not args.no_progress,
)
pq.write_table(table, args.output)
Expand All @@ -135,6 +164,7 @@ def _index_command(args: argparse.Namespace):
max_workers=max_workers,
executor_cls=executor_cls,
show_progress=not args.no_progress,
filters=filters,
):
writer.write_table(table)

Expand Down
242 changes: 225 additions & 17 deletions bids2table/_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,22 @@
"special": str,
}

# Matches sub-directory after subject ('sub-abc') and (optionally) session ('ses-01')
# directories. Must be all lowercase.
_BIDS_DATATYPE_PATTERN = re.compile(
r"sub-[a-zA-Z0-9]+(?:[/\\]ses-[a-zA-Z0-9]+)?[/\\]([a-z]+)[/\\]"
)
# Matches sub-directory after subject and (optionally) session directories.
# Must be all lowercase. Rebuilt in set_bids_schema() once schema is loaded.
_BIDS_DATATYPE_PATTERN: re.Pattern | None = None

_logger = setup_logger(__package__)


def set_bids_schema(path: str | Path | None = None) -> None:
"""Set the BIDS schema."""
global _BIDS_SCHEMA, _BIDS_ENTITY_SCHEMA, _BIDS_NAME_ENTITY_MAP
global _BIDS_ENTITY_ARROW_SCHEMA
global _BIDS_ENTITY_ARROW_SCHEMA, _BIDS_DATATYPE_PATTERN

schema = bidsschematools.schema.load_schema(path)
entity_schema = {
entity: schema.objects.entities[entity].to_dict()
for entity in schema.rules.entities
for entity in schema.objects.entities
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was specifically used to enforce order. schema.rules.entities is a list with the global entity ordering.

Suggested change
for entity in schema.objects.entities
for entity in schema.rules.entities

}
# Also include special extra entities (datatype, suffix, extension).
entity_schema.update(_BIDS_SPECIAL_ENTITY_SCHEMA)
Expand All @@ -98,6 +96,13 @@ def set_bids_schema(path: str | Path | None = None) -> None:
schema_version=schema["schema_version"],
)

# Build datatype pattern from entity names (e.g. sub, ses).
sub_name = get_entity_name("subject")
ses_name = get_entity_name("session")
_BIDS_DATATYPE_PATTERN = re.compile(
rf"{sub_name}-[a-zA-Z0-9]+(?:[/\\]{ses_name}-[a-zA-Z0-9]+)?[/\\]([a-z]+)[/\\]"
)


def _bids_entity_arrow_schema(
entity_schema: dict[str, dict[str, Any]],
Expand All @@ -109,7 +114,7 @@ def _bids_entity_arrow_schema(
for entity, cfg in entity_schema.items():
# Use short entity name (e.g. sub) as the field name.
name = cfg["name"]
dtype = _BIDS_FORMAT_ARROW_DTYPE_MAP[cfg["format"]]
dtype = _BIDS_FORMAT_ARROW_DTYPE_MAP.get(cfg["format"], pa.string())
# Insert full entity name (e.g. subject) into metadata.
metadata = {"entity": entity}
metadata.update(
Expand Down Expand Up @@ -188,6 +193,8 @@ def _parse_bids_datatype(path: Path) -> str | None:
Datatype is assumed to be the name of the sub-directory after the subject and
(optionally) session directories. Returns `None` if no match found.
"""
if _BIDS_DATATYPE_PATTERN is None:
return None
match = re.search(_BIDS_DATATYPE_PATTERN, str(path))
datatype = match.group(1) if match is not None else None
return datatype
Expand Down Expand Up @@ -244,6 +251,39 @@ def validate_bids_entities(
return valid_entities, extra_entities


def _get_entity_prefix_directory_order() -> list[str]:
"""Return entity name prefixes (e.g. 'sub', 'ses') in schema-defined
directory hierarchy order (shallower depth first).

Derived from the schema's ``rules.directories`` nesting: subject and
template are depth-1 children of root, session and cohort depth-2,
datatype depth-3. Entity types that never appear as directories are
omitted.
"""
depths: dict[str, int] = {}
for dtype in get_all_dataset_types():
rules_raw = _BIDS_SCHEMA.get("rules", {}).get("directories", {}).get(dtype, {})
# bidsschematools may return Namespace objects; normalise to plain dict.
rules: dict[str, Any] = _ensure_dict(rules_raw)
queue: list[tuple[str, int]] = [("root", 0)]
visited: set[str] = set()
while queue:
rule_name, depth = queue.pop(0)
if rule_name in visited:
continue
visited.add(rule_name)
rule = rules.get(rule_name, {})
entity = rule.get("entity")
if entity:
ename = get_entity_name(entity)
if ename and (ename not in depths or depth < depths[ename]):
depths[ename] = depth
for subdir in rule.get("subdirs", []):
for name in _get_subdir_names([subdir]):
queue.append((name, depth + 1))
return sorted(depths, key=lambda n: depths[n])


def format_bids_path(entities: dict[str, Any], int_format: str = "%d") -> Path:
"""Construct a formatted BIDS path from entities dict.

Expand All @@ -254,7 +294,7 @@ def format_bids_path(entities: dict[str, Any], int_format: str = "%d") -> Path:
Returns:
A formatted `Path` instance.
"""
special = {"datatype", "suffix", "ext"}
special = {cfg["name"] for cfg in _BIDS_SPECIAL_ENTITY_SCHEMA.values()}

# Formatted key-value entities.
entities_fmt = []
Expand All @@ -263,23 +303,191 @@ def format_bids_path(entities: dict[str, Any], int_format: str = "%d") -> Path:
if isinstance(value, int):
value = int_format % value
entities_fmt.append(f"{name}-{value}")
name = "_".join(entities_fmt)
filename = "_".join(entities_fmt)

# Append suffix and extension.
if suffix := entities.get("suffix"):
name += f"_{suffix}"
filename += f"_{suffix}"
if ext := entities.get("ext"):
name += ext
filename += ext

path = Path(filename)

# Prepend parent directories.
path = Path(name)
# Prepend schema-derived directory hierarchy.
dir_order = _get_entity_prefix_directory_order()
dir_entities = [n for n in dir_order if n in entities]
if datatype := entities.get("datatype"):
path = datatype / path
if ses := entities.get("ses"):
path = f"ses-{ses}" / path
path = f"sub-{entities['sub']}" / path
for entity_name in reversed(dir_entities):
path = f"{entity_name}-{entities[entity_name]}" / path
return path


def get_entity_name(entity_type: str) -> str:
"""Get the short name prefix for a BIDS entity (e.g. 'sub' for 'subject')."""
ent = _BIDS_ENTITY_SCHEMA.get(entity_type)
if ent is not None:
return ent.get("name", "")
try:
ent = _BIDS_SCHEMA["objects"]["entities"].get(entity_type)
if ent is not None:
return ent.get("name", "")
except (KeyError, AttributeError):
pass
return ""


def get_entity_pattern(entity_type: str) -> str:
"""Get the regex pattern for a BIDS entity directory name (e.g. 'sub-[a-zA-Z0-9]+')."""
name = get_entity_name(entity_type)
if not name:
return ""
return f"{name}-[a-zA-Z0-9]+"
Comment thread
gkiar marked this conversation as resolved.
Outdated


def get_entity_glob_pattern(entity_type: str) -> str:
"""Get the glob pattern for a BIDS entity directory (e.g. 'sub-*' for 'subject')."""
name = get_entity_name(entity_type)
if not name:
return ""
return f"{name}-*"


def get_all_dataset_types() -> tuple[str, ...]:
"""Get all BIDS dataset types defined in the schema (e.g. 'raw', 'derivative', 'study')."""
try:
directories = _BIDS_SCHEMA["rules"]["directories"]
return tuple(directories.keys())
except (KeyError, AttributeError):
return ()


def _ensure_dict(obj: Any) -> dict:
"""Convert a bidsschematools Namespace to a plain dict."""
return obj.to_dict() if hasattr(obj, "to_dict") else obj


def _get_subdir_names(subdirs: list) -> list[str]:
"""Normalize a subdirs list into flat name strings, expanding ``oneOf`` entries."""
names: list[str] = []
for entry in subdirs:
if isinstance(entry, dict):
names.extend(entry.get("oneOf", []))
else:
names.append(entry)
return names


def _get_json_data_suffixes() -> frozenset[str]:
"""Collect suffixes for which ``.json`` is the only valid extension.

These are data files (not sidecars), e.g. ``coordsystem.json``.
Derived from the schema's ``rules.files``.
"""
from collections import defaultdict

suffix_exts: dict[str, set[str]] = defaultdict(set)

def _walk(node) -> None:
if hasattr(node, "items"):
items = node.items()
elif hasattr(node, "to_dict"):
items = _ensure_dict(node).items()
else:
return
for _, v in items:
if v is None:
continue
if hasattr(v, "get") or isinstance(v, dict):
inner = _ensure_dict(v)
if isinstance(inner, dict):
sufs = inner.get("suffixes", [])
exts = inner.get("extensions", [])
if sufs and exts:
for s in sufs:
suffix_exts[s].update(exts)
_walk(v)

try:
files_rules = _BIDS_SCHEMA["rules"]["files"]
_walk(files_rules)
except (KeyError, AttributeError):
return frozenset()

return frozenset(
s for s, exts in suffix_exts.items()
if ".json" in exts and all(e == ".json" for e in exts)
)


def get_all_root_entity_types() -> tuple[str, ...]:
"""Return deduplicated root-level entity types across all dataset types.

For example, across raw/derivative/study this returns ``("subject", "template")``.
"""
seen: set[str] = set()
result: list[str] = []
for dtype in get_all_dataset_types():
for et in get_entity_child_dirs(dtype, "root"):
if et not in seen:
seen.add(et)
result.append(et)
return tuple(result)


def get_entity_child_dirs(dataset_type: str, parent_rule: str = "root") -> list[str]:
"""Get entity types valid as child directories of a parent in a dataset type.

Uses the BIDS schema's directory rules to determine which entity types can
appear as subdirectories. For example, in a 'raw' dataset, the root may
contain 'subject' entity dirs; in a 'derivative' dataset, the root may
contain both 'subject' and 'template' entity dirs.

Args:
dataset_type: BIDS dataset type (e.g. 'raw', 'derivative', 'study').
parent_rule: Name of the parent directory rule (e.g. 'root', 'subject').

Returns:
List of entity type names (e.g. ['subject'], ['subject', 'template']).
"""
try:
directories = _BIDS_SCHEMA["rules"]["directories"]
dir_rules = directories.get(dataset_type, {})
except (KeyError, AttributeError):
return []

if not hasattr(dir_rules, "get"):
return []

parent = dir_rules.get(parent_rule, {})
if not hasattr(parent, "get"):
return []

subdirs = parent.get("subdirs", [])
entity_types = []

for name in _get_subdir_names(subdirs):
child = dir_rules.get(name, {})
entity = child.get("entity") if hasattr(child, "get") else None
if entity:
entity_types.append(entity)

return entity_types


def get_file_entity_prefixes() -> tuple[str, ...]:
"""Get entity name prefixes that BIDS filenames can start with (e.g. 'sub', 'tpl').

Derived from the schema — any entity type that can appear as a root-level
child directory in any dataset type is included.
"""
prefixes: set[str] = set()
for et in get_all_root_entity_types():
name = get_entity_name(et)
if name:
prefixes.add(name)
return tuple(sorted(prefixes))


# Initialize the default BIDS schema.
set_bids_schema()
Loading