Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repos:
- id: mypy
exclude: tests|examples
args: [--ignore-missing-imports]
additional_dependencies: [types-PyYAML>=6.0.12.20250915]
additional_dependencies: [types-PyYAML>=6.0.12.20250915, types-requests>=2.33.0.20260503]
- repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks
rev: v2.15.0
hooks:
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ dependencies = [
"numpy>=2.4.2",
"pydantic>=2.12.3",
"scipy>=1.17.0",
"styxpodman"
"styxpodman",
"requests>=2.33.1"
]

[dependency-groups]
Expand All @@ -34,7 +35,8 @@ dev = [
"pytest>=9.0.3",
"pytest-cov>=7.0.0",
"ruff>=0.14.7",
"types-pyyaml>=6.0.12.20250915"
"types-pyyaml>=6.0.12.20250915",
"types-requests>=2.33.0.20260503"
]
docs = ["pdoc>=15.0.4"]
validate = ["check-jsonschema>=0.37.1"]
Expand Down
38 changes: 38 additions & 0 deletions src/neuromaps_prime/fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Helpers for grabbing from remote repositories."""

from pathlib import Path
from typing import Literal

from neuromaps_prime.remote import OSFStorage


def id_storage(uri: str) -> Literal["osf"] | None:
"""Identify the storage type.

Args:
uri: Remote URI to fetch data from

Returns:
String indicating type of storage (one of 'osf')
"""
if "osf.io" in uri:

Check failure

Code scanning / CodeQL

Incomplete URL substring sanitization High

The string
osf.io
may be at an arbitrary position in the sanitized URL.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
return "osf"
return None


def download_and_validate(uri: str, dest: str | Path) -> None:
"""Download and validate the file.

Args:
uri: Remote URI to fetch data from
dest: Output file path name
token: Optional token to use for remote storage

Raises:
ValueError: if storage cannot be identified from provided URI
"""
match id_storage(uri):
case "osf":
OSFStorage().download(uri, Path(dest))
case _:
raise ValueError(f"Could not identify storage from uri: {uri}")
47 changes: 26 additions & 21 deletions src/neuromaps_prime/graph/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@

from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any, cast

import yaml
from pydantic import BaseModel, Field
from pydantic import BaseModel

from neuromaps_prime.graph.cache import GraphCache # noqa: TC001 (pydantic req'd)
from neuromaps_prime.graph.models import (
Expand All @@ -30,6 +29,8 @@
from neuromaps_prime.resources import NEUROMAPSPRIME_GRAPH

if TYPE_CHECKING:
from pathlib import Path

import networkx as nx


Expand All @@ -48,7 +49,7 @@ class GraphBuilder(BaseModel):
model_config = {"arbitrary_types_allowed": True}

cache: GraphCache
data_dir: Path | None = Field(default=None)
data_dir: Path

# ------------------------------------------------------------------ #
# Public entry points #
Expand Down Expand Up @@ -201,41 +202,40 @@ def _build_volume_edge(
# Generic resource parsers #
# ------------------------------------------------------------------ #

def _resolve_path(self, path: str) -> Path:
"""Prepend data_dir to path when set, otherwise return as-is."""
return (self.data_dir / path) if self.data_dir else Path(path)

def _parse_surface_annotations(
self, prefix: str, space: str, density: str, hemispheres: dict[str, Any]
self, prefix: str, space: str, density: str, annots: dict[str, Any]
) -> list[SurfaceAnnotation]:
"""Parse annotation entries from a surface type block.

Args:
prefix: Name prefix derived from space or source/target space pair.
space: Space identifier applied to every annotation.
density: Density key (e.g. ``"32k"``) shared by all entries in this block.
hemispheres: Dict keyed by label → {hemi: path, ...} with optional
annots: Dict keyed by label → {annot: left, ...} with optional
``"references"`` and ``"notes"`` keys.

Returns:
List of :class:`SurfaceAnnotation` instances parsed from the block.
"""
annotations = []
for label, value in hemispheres.items():
for annot, value in annots.items():
# Grab references and notes once
references = value.get("references")
notes = value.get("notes")
for hemi, path in value.items():
if hemi in ("notes", "references"):
continue
name = f"{prefix}_{density}_{hemi}_{annot}"
ext = "func.gii" if "PC" in annot else "label.gii"
annotations.append(
SurfaceAnnotation(
name=f"{prefix}_{density}_{hemi}_{label}",
name=name,
space=space,
label=label,
label=annot,
density=density,
hemisphere=hemi,
file_path=self._resolve_path(path),
uri=path,
file_path=self.data_dir / f"{name}.{ext}",
references=references,
notes=notes,
)
Expand All @@ -262,8 +262,8 @@ def _parse_surface_entries(
density: Density key (e.g. ``"32k"``) shared by all entries in this block.
surf_type: Resource type string (e.g. ``"midthickness"``).
hemispheres: Dict mapping hemisphere key to file path.
fixed_fields: Fields forwarded verbatim to every instance (e.g. space,
description).
fixed_fields: Dict of fields forwarded verbatim to every instance (e.g.
space, description).
provider: Provider string injected for :class:`SurfaceTransform` entries;
empty string for atlases.
transform_refs: References list attached to the enclosing transform block,
Expand All @@ -275,8 +275,9 @@ def _parse_surface_entries(
extra = {"provider": provider} if cls is SurfaceTransform else {}
return [
cls(
name=f"{prefix}_{density}_{hemi}_{surf_type}",
file_path=self._resolve_path(path),
name=(name := f"{prefix}_{density}_{hemi}_{surf_type}"),
uri=path,
file_path=self.data_dir / f"{name}.surf.gii",
density=density,
hemisphere=hemi, # type: ignore[arg-type]
resource_type=surf_type,
Expand Down Expand Up @@ -398,24 +399,28 @@ def _parse_volume_resources(
for vol_type, vol_value in types.items():
if vol_type == "annotation":
for annot_key, annot_dict in vol_value.items():
name = f"{prefix}_{res}_{annot_key}.nii.gz"
annotations.append(
VolumeAnnotation(
name=f"{prefix}_{res}_{annot_key}",
name=name,
space=space,
label=annot_key,
resolution=res,
file_path=self._resolve_path(annot_dict.get("uri")),
uri=annot_dict.get("uri"),
file_path=self.data_dir / name,
references=annot_dict.get("references"),
notes=annot_dict.get("notes"),
)
)
continue

extra = {"provider": provider} if is_transform else {}
name = f"{prefix}_{res}_{vol_type}"
result.append(
cls(
name=f"{prefix}_{res}_{vol_type}",
file_path=self._resolve_path(vol_value),
name=name,
uri=vol_value,
file_path=self.data_dir / name,
resolution=res,
resource_type=vol_type,
references=transform_refs,
Expand Down
17 changes: 9 additions & 8 deletions src/neuromaps_prime/graph/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

from __future__ import annotations

import os
from pathlib import Path
from typing import Any, Literal

import networkx as nx
from platformdirs import user_cache_dir

from neuromaps_prime.graph.builder import GraphBuilder
from neuromaps_prime.graph.cache import GraphCache
Expand All @@ -33,6 +33,8 @@
from neuromaps_prime.graph.utils import GraphUtils
from neuromaps_prime.niwrap import setup_runner

NEUROMAPS_DATA_DIR = Path(user_cache_dir("neuromaps_prime"))


class NeuromapsGraph(nx.MultiDiGraph):
"""Multi-directed graph of brain template spaces and their transformations."""
Expand All @@ -47,7 +49,7 @@ def __init__(
image_overrides: dict[str, str] | None = None,
verbose: int = 0,
yaml_file: Path | None = None,
data_dir: Path | None = None,
data_dir: Path = NEUROMAPS_DATA_DIR,
*,
_testing: bool = False,
**kwargs, # noqa: ANN003 (ignore annotation for kwargs)
Expand All @@ -61,8 +63,7 @@ def __init__(
image_overrides: Dictionary containing overrides for container tags.
yaml_file: Path to the graph definition YAML. Defaults to the
bundled ``neuromaps_graph.yaml``.
data_dir: Optional root directory prepended to all relative file
paths in the YAML.
data_dir: Directory to save remote data. Defaults to system cache directory.
verbose: Verbosity level (0=WARNING, 1=INFO, 2+=DEBUG)
_testing: When ``True``, skip YAML loading (for unit tests).
**kwargs: Additional keyword arguments passed for runner setup.
Expand All @@ -77,11 +78,11 @@ def __init__(
**kwargs,
)
# Resource locations
self.data_dir = next(
(Path(d) for d in (data_dir, os.getenv("NEUROMAPS_DATA")) if d), None
)
self.yaml_path = yaml_file
if not data_dir.exists():
data_dir.mkdir(parents=True, exist_ok=True)
# Graph initialization
self.data_dir = data_dir
self.yaml_path = yaml_file
self._cache = GraphCache()
self.utils = GraphUtils(graph=self, cache=self._cache)
self.surface_ops = SurfaceTransformOps(cache=self._cache, utils=self.utils)
Expand Down
42 changes: 22 additions & 20 deletions src/neuromaps_prime/graph/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
"""Models for resources in the neuromaps_prime graph."""

import logging
from collections.abc import Sequence
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field

from neuromaps_prime.fetcher import download_and_validate

_logger = logging.getLogger(__name__)


class Resource(BaseModel):
Expand All @@ -13,33 +18,30 @@ class Resource(BaseModel):
name: str
description: str | None
file_path: Path
uri: str | None = None
references: Sequence[str | dict[str, str]] | None = None
notes: Sequence[str] | None = None

@field_validator("file_path")
@classmethod
def validate_file_path(cls, v: Path) -> Path:
"""Validate that the file exists at the given path.

Args:
v: The file path to validate.

Returns:
The validated file path.

Raises:
FileNotFoundError: If the file does not exist.
"""
if not v.exists():
raise FileNotFoundError(f"File path does not exist: {v}")
return v

def fetch(self) -> Path:
"""Return the path to this resource's file.
"""Return the path to this resource's file, downloading if necessary.

Returns:
Path to the resource file.

Raises:
FileNotFoundError: if file cannot be fetched
"""
if self.file_path.exists():
return self.file_path
if self.uri is None:
raise FileNotFoundError("File does not exist and cannot be fetched.")
if (local_file := Path(self.uri)).exists():
self.file_path = local_file
else:
_logger.info(f"Fetching {self.file_path.name} from remote server.")
download_and_validate(uri=self.uri, dest=self.file_path)
if not self.file_path.exists():
raise FileNotFoundError("File does not exist.")
return self.file_path

def __repr__(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/neuromaps_prime/niwrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def setup_runner(
logging.DEBUG if verbose >= len(_LOG_LEVELS) - 1 else logging.WARNING
)

neuromaps_prime_logger = logging.getLogger("neuromaps-PRIME")
neuromaps_prime_logger = logging.getLogger("neuromaps_prime")
neuromaps_prime_logger.setLevel(_LOG_LEVELS[log_level])
if not neuromaps_prime_logger.handlers:
handler = logging.StreamHandler()
Expand Down
5 changes: 5 additions & 0 deletions src/neuromaps_prime/remote/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Module associated with remote storages."""

from .osf import OSFStorage

__all__ = ["OSFStorage"]
Loading
Loading