Skip to content

Commit d3dc505

Browse files
committed
Move runner setup and logging into orchestration layer
Orchestration run() functions now handle runner setup (init_runner) and workflow start/complete logging. CLI modules are reduced to pure arg parsing: construct Filters + RunnerConfig, call run(), return 0. Introduces RunnerConfig dataclass and init_runner() in orchestration. Moves _DEFAULT_ENV_VARS from cli/__init__.py to orchestration/.
1 parent cbbd40c commit d3dc505

22 files changed

Lines changed: 188 additions & 196 deletions

src/rbc/cli/__init__.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,3 @@
77
"""
88

99
from __future__ import annotations
10-
11-
from rbc.core import CPAC_ANTS_SEED
12-
13-
_DEFAULT_ENV_VARS = {
14-
"ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS": "1",
15-
"ANTS_RANDOM_SEED": CPAC_ANTS_SEED,
16-
}

src/rbc/cli/all.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@
1010
from dataclasses import dataclass
1111
from typing import TYPE_CHECKING, Literal
1212

13-
from rbc.cli import _DEFAULT_ENV_VARS
1413
from rbc.cli.base import BaseArgs, _validate_atlas, _validate_positive, _validate_task
15-
from rbc.core.niwrap import setup_runner
16-
from rbc.orchestration import Filters
14+
from rbc.orchestration import Filters, RunnerConfig
1715
from rbc.orchestration.all import run
1816

1917
if TYPE_CHECKING:
@@ -56,10 +54,6 @@ def validate_namespace(cls, ns: argparse.Namespace) -> AllArgs:
5654

5755
def main(args: AllArgs) -> int:
5856
"""Main entrypoint of combined pipeline."""
59-
ctx = setup_runner(runner=args.runner, verbose=args.verbose, tmp_dir=args.tmp_dir)
60-
ctx.runner.environ = _DEFAULT_ENV_VARS
61-
ctx.logger.info("Preparing to run RBC full pipeline")
62-
6357
run(
6458
input_dir=args.input_dir,
6559
output_dir=args.output_dir,
@@ -73,10 +67,12 @@ def main(args: AllArgs) -> int:
7367
fwhm=args.fwhm,
7468
start_tr=args.start_tr,
7569
tr=args.tr,
76-
verbose=ctx.verbose,
70+
runner_config=RunnerConfig(
71+
runner=args.runner,
72+
verbose=bool(args.verbose),
73+
tmp_dir=args.tmp_dir,
74+
),
7775
)
78-
79-
ctx.logger.info("RBC full pipeline complete")
8076
return 0
8177

8278

src/rbc/cli/anatomical.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
from dataclasses import dataclass
66
from typing import TYPE_CHECKING
77

8-
from rbc.cli import _DEFAULT_ENV_VARS
98
from rbc.cli.base import BaseArgs
10-
from rbc.core.niwrap import setup_runner
11-
from rbc.orchestration import Filters
9+
from rbc.orchestration import Filters, RunnerConfig
1210
from rbc.orchestration.anatomical import run
1311

1412
if TYPE_CHECKING:
@@ -28,21 +26,19 @@ def validate_namespace(cls, ns: argparse.Namespace) -> AnatomicalArgs:
2826

2927
def main(args: AnatomicalArgs) -> int:
3028
"""Main entrypoint of anatomical workflow."""
31-
ctx = setup_runner(runner=args.runner, verbose=args.verbose, tmp_dir=args.tmp_dir)
32-
ctx.runner.environ = _DEFAULT_ENV_VARS
33-
ctx.logger.info("Preparing to run RBC anatomical workflow")
34-
3529
run(
3630
input_dir=args.input_dir,
3731
output_dir=args.output_dir,
3832
filters=Filters(
3933
participant_label=args.participant_label,
4034
session_label=args.session_label,
4135
),
42-
verbose=ctx.verbose,
36+
runner_config=RunnerConfig(
37+
runner=args.runner,
38+
verbose=bool(args.verbose),
39+
tmp_dir=args.tmp_dir,
40+
),
4341
)
44-
45-
ctx.logger.info("RBC anatomical workflow complete")
4642
return 0
4743

4844

src/rbc/cli/functional.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
from dataclasses import dataclass
1313
from typing import TYPE_CHECKING, Literal
1414

15-
from rbc.cli import _DEFAULT_ENV_VARS
1615
from rbc.cli.base import BaseArgs, _validate_positive, _validate_task
17-
from rbc.core.niwrap import setup_runner
18-
from rbc.orchestration import Filters
16+
from rbc.orchestration import Filters, RunnerConfig
1917
from rbc.orchestration.functional import run
2018

2119
if TYPE_CHECKING:
@@ -46,10 +44,6 @@ def validate_namespace(cls, ns: argparse.Namespace) -> FunctionalArgs:
4644

4745
def main(args: FunctionalArgs) -> int:
4846
"""Main entrypoint of functional workflow."""
49-
ctx = setup_runner(runner=args.runner, verbose=args.verbose, tmp_dir=args.tmp_dir)
50-
ctx.runner.environ = _DEFAULT_ENV_VARS
51-
ctx.logger.info("Preparing to run RBC functional workflow")
52-
5347
run(
5448
input_dir=args.input_dir,
5549
output_dir=args.output_dir,
@@ -60,10 +54,12 @@ def main(args: FunctionalArgs) -> int:
6054
),
6155
regressors=args.regressor,
6256
tr=args.tr,
63-
verbose=ctx.verbose,
57+
runner_config=RunnerConfig(
58+
runner=args.runner,
59+
verbose=bool(args.verbose),
60+
tmp_dir=args.tmp_dir,
61+
),
6462
)
65-
66-
ctx.logger.info("RBC functional workflow complete")
6763
return 0
6864

6965

src/rbc/cli/longitudinal.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
from dataclasses import dataclass
66
from typing import TYPE_CHECKING
77

8-
from rbc.cli import _DEFAULT_ENV_VARS
98
from rbc.cli.base import BaseArgs
10-
from rbc.core.niwrap import setup_runner
11-
from rbc.orchestration import Filters
9+
from rbc.orchestration import Filters, RunnerConfig
1210
from rbc.orchestration.longitudinal import run
1311

1412
if TYPE_CHECKING:
@@ -39,10 +37,6 @@ def validate_namespace(cls, ns: argparse.Namespace) -> LongitudinalArgs:
3937

4038
def main(args: LongitudinalArgs) -> int:
4139
"""Main entrypoint of longitudinal workflow."""
42-
ctx = setup_runner(runner=args.runner, verbose=args.verbose, tmp_dir=args.tmp_dir)
43-
ctx.runner.environ = _DEFAULT_ENV_VARS
44-
ctx.logger.info("Preparing to run RBC longitudinal workflow")
45-
4640
run(
4741
input_dir=args.input_dir,
4842
output_dir=args.output_dir,
@@ -52,10 +46,12 @@ def main(args: LongitudinalArgs) -> int:
5246
),
5347
anatomical=args.anatomical,
5448
functional=args.functional,
55-
verbose=ctx.verbose,
49+
runner_config=RunnerConfig(
50+
runner=args.runner,
51+
verbose=bool(args.verbose),
52+
tmp_dir=args.tmp_dir,
53+
),
5654
)
57-
58-
ctx.logger.info("RBC longitudinal workflow complete")
5955
return 0
6056

6157

src/rbc/cli/metrics.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@
99
from dataclasses import dataclass
1010
from typing import TYPE_CHECKING, Literal
1111

12-
from rbc.cli import _DEFAULT_ENV_VARS
1312
from rbc.cli.base import BaseArgs, _validate_atlas, _validate_positive, _validate_task
14-
from rbc.core.niwrap import setup_runner
15-
from rbc.orchestration import Filters
13+
from rbc.orchestration import Filters, RunnerConfig
1614
from rbc.orchestration.metrics import run
1715

1816
if TYPE_CHECKING:
@@ -52,10 +50,6 @@ def validate_namespace(cls, ns: argparse.Namespace) -> MetricsArgs:
5250

5351
def main(args: MetricsArgs) -> int:
5452
"""Main entrypoint of metrics workflow."""
55-
ctx = setup_runner(runner=args.runner, verbose=args.verbose, tmp_dir=args.tmp_dir)
56-
ctx.runner.environ = _DEFAULT_ENV_VARS
57-
ctx.logger.info("Preparing to run RBC metrics workflow")
58-
5953
run(
6054
output_dir=args.output_dir,
6155
filters=Filters(
@@ -67,10 +61,12 @@ def main(args: MetricsArgs) -> int:
6761
atlases=args.atlas,
6862
fwhm=args.fwhm,
6963
tr=args.tr,
70-
verbose=ctx.verbose,
64+
runner_config=RunnerConfig(
65+
runner=args.runner,
66+
verbose=bool(args.verbose),
67+
tmp_dir=args.tmp_dir,
68+
),
7169
)
72-
73-
ctx.logger.info("RBC metrics workflow complete")
7470
return 0
7571

7672

src/rbc/cli/qc.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@
1010
from dataclasses import dataclass
1111
from typing import TYPE_CHECKING, Literal
1212

13-
from rbc.cli import _DEFAULT_ENV_VARS
1413
from rbc.cli.base import BaseArgs, _validate_positive, _validate_task
15-
from rbc.core.niwrap import setup_runner
16-
from rbc.orchestration import Filters
14+
from rbc.orchestration import Filters, RunnerConfig
1715
from rbc.orchestration.qc import run
1816

1917
if TYPE_CHECKING:
@@ -44,10 +42,6 @@ def validate_namespace(cls, ns: argparse.Namespace) -> QCArgs:
4442

4543
def main(args: QCArgs) -> int:
4644
"""Main entrypoint of QC workflow."""
47-
ctx = setup_runner(runner=args.runner, verbose=args.verbose, tmp_dir=args.tmp_dir)
48-
ctx.runner.environ = _DEFAULT_ENV_VARS
49-
ctx.logger.info("Preparing to run RBC QC workflow")
50-
5145
run(
5246
output_dir=args.output_dir,
5347
filters=Filters(
@@ -57,10 +51,12 @@ def main(args: QCArgs) -> int:
5751
),
5852
regressors=args.regressor,
5953
start_tr=args.start_tr,
60-
verbose=ctx.verbose,
54+
runner_config=RunnerConfig(
55+
runner=args.runner,
56+
verbose=bool(args.verbose),
57+
tmp_dir=args.tmp_dir,
58+
),
6159
)
62-
63-
ctx.logger.info("RBC QC workflow complete")
6460
return 0
6561

6662

src/rbc/orchestration/__init__.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
11
"""Pipeline orchestration layer.
22
3-
Provides ``run()`` entry points for each workflow that handle BIDS table
4-
loading, filtering, sub/ses iteration, and the discover-process-export
5-
loop. CLI modules delegate to these after parsing arguments.
3+
Provides ``run()`` entry points for each workflow that handle runner setup,
4+
BIDS table loading, filtering, sub/ses iteration, and the
5+
discover-process-export loop. CLI modules delegate to these after parsing
6+
arguments.
67
"""
78

89
from __future__ import annotations
910

1011
from dataclasses import dataclass, field
1112
from typing import TYPE_CHECKING
1213

14+
from rbc.core import CPAC_ANTS_SEED
15+
from rbc.core.niwrap import setup_runner
16+
1317
if TYPE_CHECKING:
1418
from collections.abc import Sequence
19+
from pathlib import Path
20+
from typing import Literal
21+
22+
_DEFAULT_ENV_VARS = {
23+
"ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS": "1",
24+
"ANTS_RANDOM_SEED": CPAC_ANTS_SEED,
25+
}
1526

1627

1728
@dataclass(frozen=True)
@@ -27,3 +38,30 @@ class Filters:
2738
participant_label: Sequence[str] = field(default_factory=tuple)
2839
session_label: Sequence[str] = field(default_factory=tuple)
2940
task: str | None = None
41+
42+
43+
@dataclass(frozen=True)
44+
class RunnerConfig:
45+
"""Configuration for the execution backend.
46+
47+
Attributes:
48+
runner: Execution backend (local, docker, podman, singularity).
49+
verbose: Verbosity level.
50+
tmp_dir: Temporary directory for intermediate files.
51+
"""
52+
53+
runner: Literal["auto", "local", "docker", "podman", "singularity"] = "local"
54+
verbose: bool = False
55+
tmp_dir: Path | None = None
56+
57+
58+
def init_runner(config: RunnerConfig) -> None:
59+
"""Set up the execution backend and environment variables.
60+
61+
Args:
62+
config: Runner configuration.
63+
"""
64+
ctx = setup_runner(
65+
runner=config.runner, verbose=config.verbose, tmp_dir=config.tmp_dir
66+
)
67+
ctx.runner.environ = _DEFAULT_ENV_VARS

src/rbc/orchestration/all.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from rbc.bids.qc import export_qc
1818
from rbc.bids.session import load_session
1919
from rbc.context import RunContext
20+
from rbc.orchestration import Filters, RunnerConfig, init_runner
2021
from rbc.orchestration.anatomical import process_session as process_anat
2122
from rbc.orchestration.functional import process_session as process_func
2223
from rbc.workflows.metrics import single_session_metrics
@@ -26,7 +27,6 @@
2627
from collections.abc import Sequence
2728
from pathlib import Path
2829

29-
from rbc.orchestration import Filters
3030
from rbc_resources import AtlasName
3131

3232
_logger = logging.getLogger(__name__)
@@ -42,7 +42,7 @@ def run(
4242
fwhm: float,
4343
start_tr: int,
4444
tr: float | None = None,
45-
verbose: bool = False,
45+
runner_config: RunnerConfig | None = None,
4646
) -> None:
4747
"""Run the full pipeline (anat + func + metrics + QC) per session.
4848
@@ -58,8 +58,13 @@ def run(
5858
fwhm: Smoothing kernel FWHM in mm.
5959
start_tr: Number of initial TRs discarded during preprocessing.
6060
tr: TR override in seconds, or ``None`` to read from headers.
61-
verbose: Show progress bar.
61+
runner_config: Execution backend configuration.
6262
"""
63+
config = runner_config or RunnerConfig()
64+
init_runner(config)
65+
verbose = config.verbose
66+
67+
_logger.info("Preparing to run RBC full pipeline")
6368
df = load_table(
6469
dataset_dir=input_dir, index_fpath=None, max_workers=0, verbose=verbose
6570
)
@@ -140,3 +145,5 @@ def run(
140145
_logger.info("QC %s for sub-%s", status, pipe_ctx.sub)
141146

142147
pipe_ctx.ensure_dataset_description()
148+
149+
_logger.info("RBC full pipeline complete")

src/rbc/orchestration/anatomical.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
from rbc.bids.anatomical import discover_anatomical, export_anatomical
1313
from rbc.bids.session import load_session
1414
from rbc.context import RunContext
15+
from rbc.orchestration import Filters, RunnerConfig, init_runner
1516
from rbc.workflows.anatomical import AnatomicalOutputs, single_session_preprocess
1617

1718
if TYPE_CHECKING:
1819
from pathlib import Path
1920

2021
from rbc.bids.session import SessionTables
21-
from rbc.orchestration import Filters
2222

2323
_logger = logging.getLogger(__name__)
2424

@@ -54,16 +54,21 @@ def run(
5454
output_dir: Path,
5555
*,
5656
filters: Filters,
57-
verbose: bool = False,
57+
runner_config: RunnerConfig | None = None,
5858
) -> None:
5959
"""Run the anatomical pipeline for all matching subjects/sessions.
6060
6161
Args:
6262
input_dir: BIDS dataset directory.
6363
output_dir: Output directory for derivatives.
6464
filters: Participant/session/task filters.
65-
verbose: Show progress bar.
65+
runner_config: Execution backend configuration.
6666
"""
67+
config = runner_config or RunnerConfig()
68+
init_runner(config)
69+
verbose = config.verbose
70+
71+
_logger.info("Preparing to run RBC anatomical workflow")
6772
df = load_table(
6873
dataset_dir=input_dir, index_fpath=None, max_workers=0, verbose=verbose
6974
)
@@ -90,3 +95,5 @@ def run(
9095
session = load_session(sub_ses_group, pipe_ctx.sub, pipe_ctx.ses)
9196
process_session(session, pipe_ctx)
9297
pipe_ctx.ensure_dataset_description()
98+
99+
_logger.info("RBC anatomical workflow complete")

0 commit comments

Comments
 (0)