Skip to content

Commit 4981141

Browse files
committed
🐛 Reseparate longitudinal expectedOutputs from sessions specific
1 parent b46fc43 commit 4981141

File tree

4 files changed

+56
-19
lines changed

4 files changed

+56
-19
lines changed

CPAC/pipeline/cpac_pipeline.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (C) 2012-2024 C-PAC Developers
1+
# Copyright (C) 2012-2025 C-PAC Developers
22

33
# This file is part of C-PAC.
44

@@ -271,8 +271,7 @@ def run_workflow(
271271

272272
subject_id, p_name, log_dir = set_subject(sub_dict, c)
273273
c["subject_id"] = subject_id
274-
275-
init_loggers(subject_id, c, log_dir, mock=True, longitudinal=False)
274+
init_loggers(subject_id, c, log_dir, mock=True)
276275

277276
# Start timing here
278277
pipeline_start_time = time.time()
@@ -1208,7 +1207,6 @@ def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None):
12081207
wf = initialize_nipype_wf(
12091208
cfg, sub_dict["subject_id"], sub_dict.get("unique_id", None), name=pipeline_name
12101209
)
1211-
12121210
# Extract credentials path if it exists
12131211
try:
12141212
creds_path = sub_dict["creds_path"]

CPAC/pipeline/cpac_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def run_T1w_longitudinal(sublist, cfg: Configuration, dry_run: bool = False):
253253
log_dir: str
254254
_, _, log_dir = set_subject(sub_list[0], cfg)
255255
log_dir = str(Path(log_dir).parent / f"{subject_id}_longitudinal")
256-
init_loggers(subject_id, cfg, log_dir, mock=True, longitudinal=True)
256+
init_loggers(subject_id, cfg, log_dir, mock=True)
257257
anat_longitudinal_wf(subject_id, sub_list, cfg, dry_run=dry_run)
258258
elif len(sub_list) == 1:
259259
warnings.warn(

CPAC/pipeline/engine.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from CPAC.utils.bids_utils import res_in_filename
5151
from CPAC.utils.configuration import Configuration
5252
from CPAC.utils.datasource import (
53+
bidsier_prefix,
5354
create_anat_datasource,
5455
create_func_datasource,
5556
create_general_datasource,
@@ -64,6 +65,7 @@
6465
WARNING_FREESURFER_OFF_WITH_DATA,
6566
WFLOGGER,
6667
)
68+
from CPAC.utils.monitoring.custom_logging import MockLogger
6769
from CPAC.utils.outputs import Outputs
6870
from CPAC.utils.utils import (
6971
check_prov_for_regtool,
@@ -1133,10 +1135,20 @@ def post_process(self, wf, label, connection, json_info, pipe_idx, pipe_x, outs)
11331135
def gather_pipes(self, wf, cfg, all=False, add_incl=None, add_excl=None):
11341136
excl = []
11351137
substring_excl = []
1136-
outputs_logger = getLogger(
1137-
f'{cfg.get("subject_id", getattr(wf, "name", ""))}_expectedOutputs'
1138-
)
1139-
expected_outputs = ExpectedOutputs()
1138+
try:
1139+
unique_id = re.match(r"(.*_)(sub-.*)", wf.name).group(2) # pyright: ignore[reportOptionalMemberAccess]
1140+
except (AttributeError, IndexError):
1141+
unique_id = cfg.get("subject_id", getattr(wf, "name", ""))
1142+
unique_id = bidsier_prefix(unique_id)
1143+
outputs_logger = getLogger(f"{unique_id}_expectedOutputs")
1144+
expected = {}
1145+
if isinstance(outputs_logger, MockLogger):
1146+
try:
1147+
# load already-expected outputs
1148+
expected = outputs_logger.yaml_contents()
1149+
except (FileNotFoundError, TypeError):
1150+
pass
1151+
expected_outputs = ExpectedOutputs(expected)
11401152

11411153
if add_excl:
11421154
excl += add_excl

CPAC/utils/monitoring/custom_logging.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (C) 2022-2024 C-PAC Developers
1+
# Copyright (C) 2022-2025 C-PAC Developers
22

33
# This file is part of C-PAC.
44

@@ -24,6 +24,7 @@
2424
from traceback import print_exception
2525
from typing import Literal, Optional, Sequence, TYPE_CHECKING, TypeAlias
2626

27+
import yaml
2728
from nipype import config as nipype_config, logging as nipype_logging
2829

2930
from CPAC.utils.docs import docstring_parameter
@@ -163,6 +164,22 @@ def log_subprocess(cmd, *args, raise_error=True, **kwargs):
163164
return output, 0
164165

165166

167+
class ListToSetYamlLoader(yaml.Loader):
168+
"""Custom YAML loader to convert lists to sets."""
169+
170+
def construct_sequence( # pyright: ignore[reportIncompatibleMethodOverride]
171+
self, node, deep=False
172+
) -> set[str]:
173+
"""Convert YAML sequence to a set."""
174+
return set(super().construct_sequence(node, deep))
175+
176+
177+
ListToSetYamlLoader.add_constructor(
178+
yaml.resolver.BaseResolver.DEFAULT_SEQUENCE_TAG,
179+
ListToSetYamlLoader.construct_sequence,
180+
)
181+
182+
166183
# pylint: disable=too-few-public-methods
167184
class MockHandler:
168185
"""Handler for MockLogger."""
@@ -233,6 +250,24 @@ def _get_first_file_handler(
233250
return handler
234251
return None
235252

253+
def yaml_contents(self) -> dict:
254+
"""If the logger's first handler is a YAML file, return the contents and delete them from the logger."""
255+
file = self._get_first_file_handler(self.handlers)
256+
if hasattr(file, "baseFilename"):
257+
file = Path(getattr(file, "baseFilename"))
258+
if file.suffix == ".yml":
259+
with file.open("r", encoding="utf-8") as f:
260+
contents = yaml.load(f.read(), Loader=ListToSetYamlLoader)
261+
with file.open("w", encoding="utf-8") as f:
262+
f.write("")
263+
return contents
264+
error = TypeError
265+
msg = f"Could not load YAML contents from {file}"
266+
else:
267+
error = FileNotFoundError
268+
msg = f"Could not find file handler for {self.name}"
269+
raise error(msg)
270+
236271

237272
def _lazy_sub(message, *items):
238273
"""Given lazy-logging syntax, return string with substitutions.
@@ -268,7 +303,6 @@ def set_up_logger(
268303
level: Optional[LogLevel] = None,
269304
log_dir: Optional[Path | str] = None,
270305
mock: bool = False,
271-
overwrite_existing: bool = False,
272306
) -> logging.Logger | MockLogger:
273307
r"""Initialize a logger.
274308
@@ -328,9 +362,6 @@ def set_up_logger(
328362
level = logging.NOTSET
329363
log_dir = Path(log_dir) if log_dir else Path.cwd()
330364
filepath = log_dir / filename
331-
if overwrite_existing and filepath.exists():
332-
with filepath.open("w", encoding="utf-8") as log_file:
333-
log_file.write("")
334365
if not filepath.exists():
335366
filepath.parent.mkdir(parents=True, exist_ok=True)
336367
if mock:
@@ -356,16 +387,12 @@ def init_loggers(
356387

357388
if "subject_id" not in cpac_config:
358389
cpac_config["subject_id"] = subject_id
359-
360390
set_up_logger(
361-
f"{subject_id}_expectedOutputs",
391+
f"{cpac_config['subject_id']}_expectedOutputs",
362392
filename=f"{bidsier_prefix(cpac_config['subject_id'])}_expectedOutputs.yml",
363393
level="info",
364394
log_dir=log_dir,
365395
mock=mock,
366-
overwrite_existing=( # don't overwrite if we have a longitudinal template
367-
longitudinal or not cpac_config["longitudinal_template_generation", "run"]
368-
),
369396
)
370397

371398
if cpac_config["pipeline_setup", "Debugging", "verbose"]:

0 commit comments

Comments
 (0)