Skip to content

Commit 46bfac1

Browse files
committed
Extract BIDS discovery functions from CLI modules
Adds discover_anatomical(), discover_functional(), and discover_derivative_runs() to the bids package, removing BIDS-specific iteration logic (entity extraction, row-to-path conversion, DataFrame filtering/grouping) from CLI modules. CLI modules now call discovery functions and receive structured NamedTuples (AnatomicalRun, FunctionalRun, DerivativeRun) instead of manually iterating DataFrames.
1 parent fc70f8c commit 46bfac1

11 files changed

Lines changed: 245 additions & 160 deletions

File tree

src/rbc/bids/__init__.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,25 @@
2424
extract_entities,
2525
parse_bids_name,
2626
)
27-
from rbc.bids.anatomical import export_anatomical
27+
from rbc.bids.anatomical import AnatomicalRun, discover_anatomical, export_anatomical
2828
from rbc.bids.builder import Bids
29-
from rbc.bids.functional import FunctionalInputs, export_functional, resolve_functional
29+
from rbc.bids.functional import (
30+
FunctionalInputs,
31+
FunctionalRun,
32+
discover_functional,
33+
export_functional,
34+
resolve_functional,
35+
)
3036
from rbc.bids.metrics import MetricsInputs, export_metrics, resolve_metrics
3137
from rbc.bids.qc import QCInputs, export_qc, resolve_qc
3238
from rbc.bids.query import find_file, find_files, get_extra_entity, load_table
3339
from rbc.bids.session import (
3440
ANAT_GROUP_ENTITIES,
3541
FUNC_GROUP_ENTITIES,
3642
SUB_SES_QUERY,
43+
DerivativeRun,
3744
SessionTables,
45+
discover_derivative_runs,
3846
iter_session_files,
3947
load_session,
4048
)
@@ -45,13 +53,16 @@
4553
"FUNC_GROUP_ENTITIES",
4654
"SUB_SES_QUERY",
4755
"_STANDARD_ENTITIES",
56+
"AnatomicalRun",
4857
"BIDSFile",
4958
"Bids",
5059
"BidsEntities",
5160
"Datatype",
61+
"DerivativeRun",
5262
"EntityKwargs",
5363
"Extension",
5464
"FunctionalInputs",
65+
"FunctionalRun",
5566
"MetricsInputs",
5667
"Modality",
5768
"QCInputs",
@@ -63,6 +74,9 @@
6374
"bids_path",
6475
"bids_path_from_entities",
6576
"bids_safe_label",
77+
"discover_anatomical",
78+
"discover_derivative_runs",
79+
"discover_functional",
6680
"export_anatomical",
6781
"export_functional",
6882
"export_metrics",

src/rbc/bids/anatomical.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,56 @@
1-
"""BIDS export for the anatomical workflow."""
1+
"""BIDS discovery and export for the anatomical workflow."""
22

33
from __future__ import annotations
44

5-
from typing import TYPE_CHECKING
5+
from pathlib import Path
6+
from typing import TYPE_CHECKING, NamedTuple
67

7-
from rbc.bids import Suffix, TemplateSpace
8+
import polars as pl
9+
10+
from rbc.bids import Suffix, TemplateSpace, extract_entities
11+
from rbc.bids.session import ANAT_GROUP_ENTITIES, SessionTables
812

913
if TYPE_CHECKING:
10-
from rbc.bids import Bids
14+
from collections.abc import Iterator
15+
16+
from rbc.bids import Bids, EntityKwargs
1117
from rbc.workflows.anatomical import AnatomicalOutputs
1218

1319

20+
class AnatomicalRun(NamedTuple):
21+
"""A single anatomical run discovered from a BIDS session.
22+
23+
Attributes:
24+
path: Path to the T1w NIfTI file.
25+
entities: BIDS entities for this run (run, acq, rec, echo).
26+
"""
27+
28+
path: Path
29+
entities: EntityKwargs
30+
31+
32+
def discover_anatomical(session: SessionTables) -> Iterator[AnatomicalRun]:
33+
"""Discover T1w runs in a session's anatomical data.
34+
35+
Filters for T1w files and groups by anatomical entities, yielding
36+
one :class:`AnatomicalRun` per group.
37+
38+
Args:
39+
session: Session tables from :func:`~rbc.bids.session.load_session`.
40+
41+
Yields:
42+
An :class:`AnatomicalRun` for each T1w group.
43+
"""
44+
for _, anat_df in session.anat.filter(pl.col("suffix") == "T1w").group_by(
45+
ANAT_GROUP_ENTITIES, maintain_order=True
46+
):
47+
row = anat_df.row(0, named=True)
48+
yield AnatomicalRun(
49+
path=Path(row["root"]) / row["path"],
50+
entities=extract_entities(row, ["run", "acq", "rec", "echo"]),
51+
)
52+
53+
1454
def export_anatomical(anat: Bids, outputs: AnatomicalOutputs) -> None:
1555
"""Export anatomical workflow outputs to BIDS-named derivatives.
1656

src/rbc/bids/functional.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,60 @@
1-
"""BIDS export and resolve for the functional workflow."""
1+
"""BIDS discovery, resolve, and export for the functional workflow."""
22

33
from __future__ import annotations
44

5-
from typing import TYPE_CHECKING, TypedDict
5+
from pathlib import Path
6+
from typing import TYPE_CHECKING, NamedTuple, TypedDict
67

7-
from rbc.bids import Suffix, TemplateSpace, bids_safe_label
8+
import polars as pl
89

9-
if TYPE_CHECKING:
10-
from collections.abc import Sequence
11-
from pathlib import Path
10+
from rbc.bids import Suffix, TemplateSpace, bids_safe_label, extract_entities
11+
from rbc.bids.session import FUNC_GROUP_ENTITIES, SessionTables, iter_session_files
1212

13-
import polars as pl
13+
if TYPE_CHECKING:
14+
from collections.abc import Iterator, Sequence
1415

15-
from rbc.bids import Bids
16+
from rbc.bids import Bids, EntityKwargs
1617
from rbc.workflows.functional import FunctionalOutputs
1718

1819

20+
class FunctionalRun(NamedTuple):
21+
"""A single functional run discovered from a BIDS session.
22+
23+
Attributes:
24+
path: Path to the BOLD NIfTI file.
25+
entities: BIDS entities for this run (task, run, acq, rec, dir, echo).
26+
anat_df: Matched anatomical DataFrame for this run.
27+
"""
28+
29+
path: Path
30+
entities: EntityKwargs
31+
anat_df: pl.DataFrame
32+
33+
34+
def discover_functional(session: SessionTables) -> Iterator[FunctionalRun]:
35+
"""Discover BOLD runs in a session, paired with matched anatomical data.
36+
37+
Iterates via :func:`~rbc.bids.session.iter_session_files`, filters for
38+
raw (unprocessed) BOLD files, and extracts functional entities.
39+
40+
Args:
41+
session: Session tables from :func:`~rbc.bids.session.load_session`.
42+
43+
Yields:
44+
A :class:`FunctionalRun` for each BOLD group.
45+
"""
46+
for func_df, anat_df in iter_session_files(session, groupby=FUNC_GROUP_ENTITIES):
47+
func_df = func_df.filter(pl.col("desc").is_null())
48+
row = func_df.filter(suffix="bold").row(0, named=True)
49+
yield FunctionalRun(
50+
path=Path(row["root"]) / row["path"],
51+
entities=extract_entities(
52+
row, ["task", "run", "acq", "rec", "dir", "echo"]
53+
),
54+
anat_df=anat_df,
55+
)
56+
57+
1958
class FunctionalInputs(TypedDict):
2059
"""Resolved anatomical inputs for the functional workflow."""
2160

src/rbc/bids/session.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@
1010

1111
import polars as pl
1212

13+
from rbc.bids import extract_entities
14+
1315
if TYPE_CHECKING:
1416
from collections.abc import Iterator, Sequence
1517

18+
from rbc.bids import EntityKwargs
19+
1620
SUB_SES_QUERY = ("sub", "ses")
1721

1822
ANAT_GROUP_ENTITIES = ("run", "acq", "suffix", "part", "echo", "ce", "rec", "inv")
@@ -127,3 +131,37 @@ def iter_session_files(
127131
runs_correspond=has_anat_runs and len(anat_runs) == len(func_runs),
128132
)
129133
yield func_group, anat_subset
134+
135+
136+
_FUNC_ENTITY_KEYS = ("task", "run", "acq", "rec", "dir", "echo")
137+
138+
139+
class DerivativeRun(NamedTuple):
140+
"""A single functional run discovered from derivative data.
141+
142+
Attributes:
143+
entities: BIDS entities for this run (task, run, acq, rec, dir, echo).
144+
"""
145+
146+
entities: EntityKwargs
147+
148+
149+
def discover_derivative_runs(
150+
group: pl.DataFrame,
151+
) -> Iterator[DerivativeRun]:
152+
"""Discover functional runs within a sub/ses derivative group.
153+
154+
Groups by :data:`FUNC_GROUP_ENTITIES` and extracts standard functional
155+
entities from each group.
156+
157+
Args:
158+
group: DataFrame of derivative BOLD runs for a single sub/ses.
159+
160+
Yields:
161+
A :class:`DerivativeRun` for each functional run group.
162+
"""
163+
for _, run_group in group.group_by(FUNC_GROUP_ENTITIES):
164+
row = run_group.row(0, named=True)
165+
yield DerivativeRun(
166+
entities=extract_entities(row, list(_FUNC_ENTITY_KEYS)),
167+
)

src/rbc/cli/all.py

Lines changed: 21 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,17 @@
88
from __future__ import annotations
99

1010
from dataclasses import dataclass
11-
from pathlib import Path
1211
from typing import TYPE_CHECKING, Literal
1312

1413
import polars as pl
1514
from tqdm import tqdm
1615

17-
from rbc.bids import (
18-
ANAT_GROUP_ENTITIES,
19-
FUNC_GROUP_ENTITIES,
20-
SUB_SES_QUERY,
21-
Datatype,
22-
extract_entities,
23-
load_table,
24-
)
25-
from rbc.bids.anatomical import export_anatomical
26-
from rbc.bids.functional import export_functional
16+
from rbc.bids import SUB_SES_QUERY, Datatype, load_table
17+
from rbc.bids.anatomical import discover_anatomical, export_anatomical
18+
from rbc.bids.functional import discover_functional, export_functional
2719
from rbc.bids.metrics import export_metrics
2820
from rbc.bids.qc import export_qc
29-
from rbc.bids.session import iter_session_files, load_session
21+
from rbc.bids.session import load_session
3022
from rbc.cli import _DEFAULT_ENV_VARS
3123
from rbc.cli.base import BaseArgs, _validate_atlas, _validate_positive, _validate_task
3224
from rbc.context import RunContext
@@ -109,32 +101,22 @@ def main(args: AllArgs) -> int:
109101
session = load_session(sub_ses_group, pipe_ctx.sub, pipe_ctx.ses)
110102

111103
# --- Anatomical (once per session, first T1w) ---
112-
for _, anat_df in session.anat.filter(pl.col("suffix") == "T1w").group_by(
113-
ANAT_GROUP_ENTITIES, maintain_order=True
114-
):
115-
anat_row = anat_df.filter(suffix="T1w").row(0, named=True)
116-
t1w_fpath = Path(anat_row["root"]) / anat_row["path"]
117-
ents = extract_entities(anat_row, ["run", "acq", "rec", "echo"])
118-
ctx.logger.info(f"Anatomical: {t1w_fpath}")
104+
for anat_run in discover_anatomical(session):
105+
ctx.logger.info(f"Anatomical: {anat_run.path}")
119106

120-
anat_outputs = anatomical_preprocess(in_t1w=t1w_fpath)
107+
anat_outputs = anatomical_preprocess(in_t1w=anat_run.path)
121108

122-
anat = pipe_ctx.bids(datatype=Datatype.ANAT, entities=ents)
109+
anat = pipe_ctx.bids(datatype=Datatype.ANAT, entities=anat_run.entities)
123110
export_anatomical(anat, anat_outputs)
124111

125112
# --- Functional + Metrics + QC (per BOLD run) ---
126-
for func_df, _anat_df in iter_session_files(
127-
session, groupby=FUNC_GROUP_ENTITIES
128-
):
129-
row = func_df.row(0, named=True)
130-
bold_fpath = Path(row["root"]) / row["path"]
131-
ents = extract_entities(row, ["task", "run", "acq", "rec", "dir", "echo"])
132-
ctx.logger.info(f"Functional: {bold_fpath}")
113+
for func_run in discover_functional(session):
114+
ctx.logger.info(f"Functional: {func_run.path}")
133115

134-
func_metadata = FunctionalMetadata.load(bold_fpath, tr_override=args.tr)
116+
func_metadata = FunctionalMetadata.load(func_run.path, tr_override=args.tr)
135117

136118
func_outputs = functional_preprocess(
137-
in_bold=bold_fpath,
119+
in_bold=func_run.path,
138120
t1w_brain=anat_outputs.brain,
139121
wm_bbr_mask=anat_outputs.wm_bbr_mask,
140122
brain_mask=anat_outputs.brain_mask,
@@ -146,14 +128,16 @@ def main(args: AllArgs) -> int:
146128
regressor_set=args.regressor,
147129
)
148130

149-
func = pipe_ctx.bids(datatype=Datatype.FUNC, entities=ents)
131+
func = pipe_ctx.bids(datatype=Datatype.FUNC, entities=func_run.entities)
150132
mni = export_functional(func, func_outputs, regressors=args.regressor)
151133

152134
# --- Metrics ---
153135
for regressor in args.regressor:
136+
task = func_run.entities.get("task", "")
137+
run = func_run.entities.get("run", 0)
154138
ctx.logger.info(
155-
f"Metrics: sub-{pipe_ctx.sub} task-{ents.get('task', '')} "
156-
f"run-{ents.get('run', 0)} regressor-{regressor}"
139+
f"Metrics: sub-{pipe_ctx.sub} task-{task} "
140+
f"run-{run} regressor-{regressor}"
157141
)
158142
metrics_outputs = metrics_pipeline(
159143
regressed_bold=func_outputs.regressed_bold[regressor],
@@ -172,10 +156,7 @@ def main(args: AllArgs) -> int:
172156
)
173157

174158
# --- QC ---
175-
ctx.logger.info(
176-
f"QC: sub-{pipe_ctx.sub} "
177-
f"task-{ents.get('task', '')} run-{ents.get('run', 0)}"
178-
)
159+
ctx.logger.info(f"QC: sub-{pipe_ctx.sub} task-{task} run-{run}")
179160
qc_outputs = qc_pipeline(
180161
template_bold=func_outputs.template_bold,
181162
cleaned_bold=func_outputs.cleaned_bold,
@@ -187,19 +168,16 @@ def main(args: AllArgs) -> int:
187168
template_brain_mask=func_outputs.template_brain_mask,
188169
sub=pipe_ctx.sub,
189170
ses=pipe_ctx.ses or "",
190-
task=ents.get("task", ""),
191-
run=ents.get("run", 0),
171+
task=func_run.entities.get("task", ""),
172+
run=func_run.entities.get("run", 0),
192173
start_tr=args.start_tr,
193174
regressor_set=args.regressor,
194175
)
195176

196177
export_qc(mni, qc_outputs, regressors=args.regressor)
197178

198179
status = "PASSED" if qc_outputs.passed else "FAILED"
199-
ctx.logger.info(
200-
f"QC {status} for sub-{pipe_ctx.sub} task-{ents.get('task', '')} "
201-
f"run-{ents.get('run', 0)}"
202-
)
180+
ctx.logger.info(f"QC {status} for sub-{pipe_ctx.sub} task-{task} run-{run}")
203181
pipe_ctx.ensure_dataset_description()
204182

205183
ctx.logger.info("RBC full pipeline complete")

0 commit comments

Comments
 (0)