diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 52bd7fb2..be4a7836 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,6 +4,9 @@ on: push: branches: - "*" + pull_request: + branches: + - "*" jobs: build: diff --git a/CHANGELOG.md b/CHANGELOG.md index 1575616f..8887ca23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a ### Removed +## [1.1.0] - 2025-03-18 + +### Added + +- Added new Jobmon-backend method `add_tasks_to_workflow` for adding tasks to an existing Jobmon workflow. +- Added new Jobmon-related arguments: `external_upstream_dependencies`, `task_and_template_prefix`, and `max_attempts`. +- Split Jobmon-backend method `run_workflow` into `create_workflow` and `run_workflow`. + ## [1.0.3] - 2025-03-12 ### Changed diff --git a/README.md b/README.md index 48df3bf4..91a9687c 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ # OneMod **OneMod** is an orchestration package that allows users to build pipelines of -various models created by [IHME Math Sciences](https://github.com/ihmeuw-msca). +statistical models created by [IHME Math Sciences](https://github.com/ihmeuw-msca). Core features of **OneMod** include an intuitive syntax for defining the dataflow between pipeline stages, the ability to easily parallelize over different data subsets and/or parameter sets, and options for data validation. diff --git a/docs/index.rst b/docs/index.rst index c01c116c..cb6106cb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,7 +10,7 @@ Welcome to OneMod! developer_guide/index **OneMod** is an orchestration package that allows users to build pipelines of -various models created by `IHME Math Sciences `_. +statistical models created by `IHME Math Sciences `_. Core features of **OneMod** include an intuitive syntax for defining the dataflow between pipeline stages, the ability to easily parallelize over different data subsets and/or parameter sets, and options for data validation. diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 7f8c8eb1..0ccd5468 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -38,7 +38,6 @@ """ # TODO: Optional stage-specific Python environments -# TODO: User-defined max_attempts # TODO: Could dependencies be method specific? # TODO: should we check resources format, minimum resources, cluster? @@ -46,10 +45,15 @@ from pathlib import Path from typing import Any, Literal -from jobmon.client.api import Tool -from jobmon.client.task import Task -from jobmon.client.task_template import TaskTemplate -from pydantic import validate_call +from pydantic import ConfigDict, validate_call + +try: + from jobmon.client.api import Tool + from jobmon.client.task import Task + from jobmon.client.task_template import TaskTemplate + from jobmon.client.workflow import Workflow +except ImportError: + raise ImportError("Missing optional 'jobmon' dependency") from onemod.backend.utils import ( check_input_exists, @@ -72,6 +76,8 @@ def evaluate_with_jobmon( subsets: dict[str, Any | list[Any]] | None = None, paramsets: dict[str, Any | list[Any]] | None = None, collect: bool | None = None, + task_and_template_prefix: str | None = None, + max_attempts: int = 1, **kwargs, ) -> None: """Evaluate pipeline or stage method with Jobmon. @@ -112,6 +118,110 @@ def evaluate_with_jobmon( instance. If `subsets` and `paramsets` are both None, default is True, otherwise default is False. + Jobmon Parameters + ----------------- + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. Default is None, + no prefix. + max_attempts : int + Maximum number of attempts for a task. Default is 1. + + """ + check_method(model, method) + check_input_exists(model, stages) + if python is None: + python = str(sys.executable) + + resources_dict = get_resources(resources) + workflow = create_workflow(model.name, method, cluster, resources_dict) + add_tasks_to_workflow( + model=model, + workflow=workflow, + method=method, + resources=resources_dict, + python=python, + stages=stages, + subsets=subsets, + paramsets=paramsets, + collect=collect, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + **kwargs, + ) + run_workflow(workflow) + + +@validate_call(config=ConfigDict(arbitrary_types_allowed=True)) +def add_tasks_to_workflow( + model: Pipeline | Stage, + workflow: Workflow, + method: Literal["run", "fit", "predict", "collect"], + resources: Path | str | dict[str, Any], + python: Path | str | None = None, + stages: list[str] | None = None, + subsets: dict[str, Any | list[Any]] | None = None, + paramsets: dict[str, Any | list[Any]] | None = None, + collect: bool | None = None, + task_and_template_prefix: str | None = None, + max_attempts: int = 1, + external_upstream_tasks: list[Task] | None = None, + **kwargs, +) -> None: + """Add Pipeline tasks to an existing Jobmon Workflow. + + Note that this is a publically available function, be careful of + breaking changes to the API or functionality. + + Parameters + ---------- + workflow : Workflow + Instantiated Jobmon workflow. Add new tasks to an existing Jobmon + workflow rather than creating a new workflow. Does not run the + workflow, only adds the tasks. + model : Pipeline or Stage + Pipeline or stage instance. + method : {'run', 'fit', 'predict', 'collect'} + Name of method to evalaute. + resources : dict, Path, or str + Path to resources file or dictionary of compute resources. + python : Path or str, optional + Path to Python environment. If None, use sys.executable. + Default is None. + **kwargs + Additional keyword arguments passed to stage methods. If `model` + is a `Pipeline` instance, use format`stage={arg_name: arg_value}`. + + Pipeline Parameters + ------------------- + stages : list of str, optional + Names of stages to evaluate if `model` is a `Pipeline` instance. + If None, evaluate pipeline stages. Default is None. + + Stage Parameters + ---------------- + subsets : dict, optional + Submodel data subsets to evaluate if `model` is a `Stage` + instance. If None, evaluate all data subsets. Default is None. + paramsets : dict, optional + Submodel parameter sets to evaluate if `model` is a `Stage` + instance. If None, evaluate all parameter sets. Default is None. + collect : bool, optional + Whether to collect submodel results if `model` is a `Stage` + instance. If `subsets` and `paramsets` are both None, default is + True, otherwise default is False. + + Jobmon Parameters + ----------------- + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. Default is None, + no prefix. + max_attempts : int + Maximum number of attempts for a task. Default is 1. + external_upstream_tasks : list, optional + List of Jobmon tasks external to the OneMod Stages or Pipeline that + should be treated as upstream dependencies of the new tasks. Default + is no external upstream tasks. + """ check_method(model, method) check_input_exists(model, stages) @@ -119,20 +229,22 @@ def evaluate_with_jobmon( python = str(sys.executable) resources_dict = get_resources(resources) - tool = get_tool(model.name, method, cluster, resources_dict) tasks = get_tasks( - model, - method, - tool, - resources_dict, - python, - stages, - subsets, - paramsets, - collect, + model=model, + method=method, + tool=workflow.tool, + resources=resources_dict, + python=python, + stages=stages, + subsets=subsets, + paramsets=paramsets, + collect=collect, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + external_upstream_tasks=external_upstream_tasks, **kwargs, ) - run_workflow(model.name, method, tool, tasks) + workflow.add_tasks(tasks) def get_resources(resources: Path | str | dict[str, Any]) -> dict[str, Any]: @@ -155,6 +267,36 @@ def get_resources(resources: Path | str | dict[str, Any]) -> dict[str, Any]: return resources +def create_workflow( + name: str, + method: Literal["run", "fit", "predict", "collect"], + cluster: str, + resources: dict[str, Any], +) -> Workflow: + """Create and return workflow. + + Parameters + ---------- + name : str + Pipeline or stage name. + method : str + Name of method being evaluated. + cluster : str + Cluster name. + resources : dict + Dictionary of compute resources. + + Returns + ------- + Workflow + Jobmon workflow. + + """ + tool = get_tool(name, method, cluster, resources) + workflow = tool.create_workflow(name=f"{name}_{method}") + return workflow + + def get_tool( name: str, method: str, cluster: str, resources: dict[str, Any] ) -> Tool: @@ -195,6 +337,9 @@ def get_tasks( subsets: dict[str, Any | list[Any]] | None, paramsets: dict[str, Any | list[Any]] | None, collect: bool | None, + task_and_template_prefix: str | None, + max_attempts: int, + external_upstream_tasks: list[Task] | None = None, **kwargs, ) -> list[Task]: """Get Jobmon tasks. @@ -232,6 +377,17 @@ def get_tasks( Whether to collect submodel results if `model` is a `Stage` instance. + Jobmon Parameters + ----------------- + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. + max_attempts : int + Maximum number of attempts for a task. + external_upstream_tasks : list, optional + List of Jobmon tasks external to the OneMod Stages or Pipeline that + should be treated as upstream dependencies of the new tasks. Default + None, no external upstreams. + Returns ------- list of Task @@ -240,17 +396,29 @@ def get_tasks( """ if isinstance(model, Pipeline): return get_pipeline_tasks( - model, method, tool, resources, python, stages, **kwargs + pipeline=model, + method=method, + tool=tool, + resources=resources, + python=python, + stages=stages, + external_upstream_tasks=external_upstream_tasks, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + **kwargs, ) return get_stage_tasks( - model, - method, - tool, - resources, - python, - subsets, - paramsets, - collect, + stage=model, + method=method, + tool=tool, + resources=resources, + python=python, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + subsets=subsets, + paramsets=paramsets, + collect=collect, + upstream_tasks=external_upstream_tasks, **kwargs, ) @@ -262,6 +430,9 @@ def get_pipeline_tasks( resources: dict[str, Any], python: Path | str, stages: list[str] | None, + external_upstream_tasks: list[Task] | None, + task_and_template_prefix: str | None, + max_attempts: int, **kwargs, ) -> list[Task]: """Get pipeline stage tasks. @@ -281,6 +452,13 @@ def get_pipeline_tasks( stages : list of str or None Name of stages to evaluate. If None, evaluate all pipeline stages. + external_upstream_tasks : list, optional + List of Jobmon tasks external to the OneMod Stages or Pipeline that + should be treated as upstream dependencies of the new tasks. + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. + max_attempts : int + Maximum number of attempts for a task. **kwargs Additional keyword arguments passed to stage methods. @@ -292,6 +470,7 @@ def get_pipeline_tasks( """ tasks = [] task_dict: dict[str, list[Task]] = {} + task_dict["external"] = external_upstream_tasks or [] for stage_name in pipeline.get_execution_order(stages): stage = pipeline.stages[stage_name] @@ -300,11 +479,13 @@ def get_pipeline_tasks( stage, method, pipeline.stages, task_dict, stages ) task_dict[stage_name] = get_stage_tasks( - stage, - method, - tool, - resources, - python, + stage=stage, + method=method, + tool=tool, + resources=resources, + python=python, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, upstream_tasks=upstream_tasks, **kwargs, ) @@ -348,6 +529,7 @@ def get_upstream_tasks( * If an upstream stage has submodels and `method` is in the upstream's `collect_after`, only include the task corresponding to the upstream's `collect` method. + * If there are no upstream tasks, add any external tasks as upstream. """ upstream_tasks = [] @@ -368,6 +550,10 @@ def get_upstream_tasks( else: upstream_tasks.extend(task_dict[upstream_name]) + # if there are no upstream tasks, add external upstream tasks + if not upstream_tasks: + upstream_tasks = task_dict.get("external", []) + return upstream_tasks @@ -377,6 +563,8 @@ def get_stage_tasks( tool: Tool, resources: dict[str, Any], python: Path | str, + task_and_template_prefix: str | None, + max_attempts: int, subsets: dict[str, Any | list[Any]] | None = None, paramsets: dict[str, Any | list[Any]] | None = None, collect: bool | None = None, @@ -409,6 +597,10 @@ def get_stage_tasks( True, otherwise default is False. upstream_tasks : list of Task or None, optional List of upstream stage tasks. Default is None. + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. + max_attempts : int + Maximum number of attempts for a task. **kwargs Additional keyword arguments passed to stage method. @@ -431,14 +623,20 @@ def get_stage_tasks( tool, resources, list(submodel_args.keys()), + task_and_template_prefix=task_and_template_prefix, **kwargs, ) + task_name = ( + f"{task_and_template_prefix}_{stage.name}_{method}" + if task_and_template_prefix + else f"{stage.name}_{method}" + ) if submodel_args: tasks = task_template.create_tasks( - name=f"{stage.name}_{method}", + name=task_name, upstream_tasks=upstream_tasks, - max_attempts=1, + max_attempts=max_attempts, entrypoint=entrypoint, config=config_path, method=method, @@ -448,9 +646,9 @@ def get_stage_tasks( else: tasks = [ task_template.create_task( - name=f"{stage.name}_{method}", + name=task_name, upstream_tasks=upstream_tasks, - max_attempts=1, + max_attempts=max_attempts, entrypoint=entrypoint, config=config_path, method=method, @@ -462,7 +660,14 @@ def get_stage_tasks( if collect_results(stage, method, subsets, paramsets, collect): tasks.extend( get_stage_tasks( - stage, "collect", tool, resources, python, upstream_tasks=tasks + stage=stage, + method="collect", + tool=tool, + resources=resources, + python=python, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + upstream_tasks=tasks, ) ) @@ -531,10 +736,14 @@ def get_task_template( tool: Tool, resources: dict[str, Any], submodel_args: list[str], + task_and_template_prefix: str | None, **kwargs, ) -> TaskTemplate: """Get stage task template. + If the Jobmon Tool already has an active task template with the same + name, use that task template. + Parameters ---------- stage_name : str @@ -547,6 +756,8 @@ def get_task_template( Dictionary of compute resources. submodel_args : list of str List including 'subsets' and/or 'paramsets'. + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. **kwargs Additional keyword arguments passed to stage method. @@ -556,8 +767,14 @@ def get_task_template( Stage task template. """ + template_name = ( + f"{task_and_template_prefix}_{stage_name}_{method}" + if task_and_template_prefix + else f"{stage_name}_{method}" + ) + task_template = tool.get_task_template( - template_name=f"{stage_name}_{method}", + template_name=template_name, command_template=get_command_template(method, submodel_args, **kwargs), op_args=["entrypoint"], task_args=["config", "method", "stages"] + list(kwargs.keys()), @@ -649,23 +866,15 @@ def get_task_resources( } -def run_workflow(name: str, method: str, tool: Tool, tasks: list[Task]) -> None: - """Create and run workflow. +def run_workflow(workflow: Workflow) -> None: + """Run workflow. Parameters ---------- - name : str - Pipeline or stage name. - method : str - Name of method being evaluated. - tool : Tool - Jobmon tool. - tasks : list of Task - List of stage tasks. + workflow : Workflow + Jobmon workflow to run. """ - workflow = tool.create_workflow(name=f"{name}_{method}") - workflow.add_tasks(tasks) workflow.bind() print(f"Starting workflow {workflow.workflow_id}") status = workflow.run() diff --git a/src/onemod/config/base.py b/src/onemod/config/base.py index 6e002a81..53863c56 100644 --- a/src/onemod/config/base.py +++ b/src/onemod/config/base.py @@ -63,10 +63,7 @@ class StageConfig(Config): _pipeline_config: Config = Config() _required: list[str] = [] - def add_pipeline_config(self, pipeline_config: Config | dict) -> None: - if isinstance(pipeline_config, dict): - pipeline_config = Config(**pipeline_config) - + def add_pipeline_config(self, pipeline_config: Config) -> None: missing = [] for item in self._required: if not self.stage_contains(item) and item not in pipeline_config: diff --git a/src/onemod/main.py b/src/onemod/main.py index 1f0ce092..995edac1 100644 --- a/src/onemod/main.py +++ b/src/onemod/main.py @@ -1,8 +1,6 @@ """Methods to load and evaluate pipeline and stage objects.""" import json -from importlib.util import module_from_spec, spec_from_file_location -from inspect import getmodulename from pathlib import Path from typing import Any, Literal @@ -11,6 +9,7 @@ import onemod.stage as onemod_stages from onemod.pipeline import Pipeline from onemod.stage import Stage +from onemod.utils.custom_classes import get_custom_class def load_pipeline(config: Path | str) -> Pipeline: @@ -27,7 +26,9 @@ def load_pipeline(config: Path | str) -> Pipeline: Pipeline instance. """ - return Pipeline.from_json(config) + pipeline_class: type[Pipeline] = _get_class(config) + pipeline = pipeline_class.from_json(config) + return pipeline def load_stage(config: Path | str, stage_name: str) -> Stage: @@ -46,83 +47,61 @@ def load_stage(config: Path | str, stage_name: str) -> Stage: Stage instance. """ - stage_class = _get_stage(config, stage_name) + stage_class: type[Stage] = _get_class(config, stage_name) stage = stage_class.from_json(config, stage_name) return stage -def _get_stage(config: Path | str, stage_name: str) -> Stage: - """Get stage class from JSON file. +def _get_class( + config: Path | str, stage_name: str | None = None +) -> type[Pipeline] | type[Stage]: + """Get pipeline or stage class from JSON file. Parameters ---------- config : Path or str Path to config file. - stage_name : str - Stage name. + stage_name : str or None, optional + Name of stage in config file to get class for. If None, get + class for pipeline in config file. Default is None. Returns ------- - Stage - Stage class. + Pipeline or Stage + Pipeline or Stage class. Notes ----- - When a custom stage class has the same name as a built-in OneMod - stage class, this function returns the custom stage class. + When a custom class has the same name as a built-in OneMod class, + this function returns the custom class. """ with open(config, "r") as f: config_dict = json.load(f) - if stage_name not in config_dict["stages"]: - raise KeyError(f"Config does not contain a stage named '{stage_name}'") - config_dict = config_dict["stages"][stage_name] - stage_type = config_dict["type"] + + if stage_name is None: + model_name = config_dict["name"] + model_type = config_dict["type"] + else: + if stage_name not in config_dict["stages"]: + raise KeyError( + f"Config does not contain a stage named '{stage_name}'" + ) + config_dict = config_dict["stages"][stage_name] + model_name = stage_name + model_type = config_dict["type"] if "module" in config_dict: - return _get_custom_stage(stage_type, config_dict["module"]) - if hasattr(onemod_stages, stage_type): - return getattr(onemod_stages, stage_type) + return get_custom_class(model_type, config_dict["module"]) + if model_type == "Pipeline": + return Pipeline + if hasattr(onemod_stages, model_type): + return getattr(onemod_stages, model_type) raise KeyError( - f"Config does not contain a module for custom stage '{stage_name}'" + f"Config does not contain a module for custom class '{model_name}'" ) -def _get_custom_stage(stage_type: str, module: str) -> Stage: - """Get custom stage class from file. - - Parameters - ---------- - stage_type : str - Name of custom stage class. - module : str - Path to Python module containing custom stage class definition. - - Returns - ------- - Stage - Custom stage class. - - """ - module_path = Path(module) - - module_name = getmodulename(module_path) - if module_name is None: - raise ValueError(f"Could not determine module name from {module_path}") - - spec = spec_from_file_location(module_name, module_path) - if spec is None: - raise ImportError(f"Could not load spec for module {module_path}") - - if spec.loader is None: - raise ImportError(f"Module spec for {module_path} has no loader") - - loaded_module = module_from_spec(spec) - spec.loader.exec_module(loaded_module) - - return getattr(loaded_module, stage_type) - - def evaluate( config: Path | str, method: Literal["run", "fit", "predict", "collect"] = "run", diff --git a/src/onemod/pipeline.py b/src/onemod/pipeline.py index 1e47ba63..b4306474 100644 --- a/src/onemod/pipeline.py +++ b/src/onemod/pipeline.py @@ -5,6 +5,7 @@ import json import logging from collections import deque +from inspect import getfile from pathlib import Path from typing import Any, Literal @@ -40,8 +41,42 @@ class Pipeline(BaseModel): directory: Path config: Config = Config() groupby_data: Path | None = None + _module: Path | None = None _stages: dict[str, Stage] = {} + def __init__( + self, + module: Path | str | None = None, + stages: list[Stage] | None = None, + **kwargs, + ) -> None: + """Create pipeline instance.""" + super().__init__(**kwargs) + self.set_module(module) + if stages is not None: + self.add_stages(stages) + + @computed_property + def type(self) -> str: + """Pipeline type.""" + return type(self).__name__ + + @computed_property + def module(self) -> Path | None: + """Path to module containing custom pipeline definition.""" + return self._module + + def set_module(self, module: Path | str | None) -> None: + if isinstance(module, (Path, str)): + self._module = Path(module) + elif self.type != "Pipeline": + try: + self._module = Path(getfile(self.__class__)) + except (OSError, TypeError): + raise TypeError( + f"Could not find module for custom pipeline class '{self.name}'" + ) + @computed_property def stages(self) -> dict[str, Stage]: """Pipeline stages.""" @@ -72,18 +107,19 @@ def from_json(cls, config_path: Path | str) -> Pipeline: with open(config_path, "r") as file: config = json.load(file) + del config["type"] + del config["dependencies"] stages = config.pop("stages", {}) - pipeline = cls(**config) - if stages: from onemod.main import load_stage - pipeline.add_stages( - [load_stage(config_path, stage) for stage in stages] + return cls( + stages=[load_stage(config_path, stage) for stage in stages], + **config, ) - return pipeline + return cls(**config) def to_json(self, config_path: Path | str | None = None) -> None: """Save pipeline as JSON file. diff --git a/src/onemod/stage/base.py b/src/onemod/stage/base.py index 12fd49da..c1f18591 100644 --- a/src/onemod/stage/base.py +++ b/src/onemod/stage/base.py @@ -13,11 +13,11 @@ from pydantic import BaseModel, ConfigDict import onemod.stage as onemod_stages -from onemod.config import StageConfig -from onemod.dtypes import Data -from onemod.dtypes.unique_sequence import UniqueList +from onemod.config import Config, StageConfig +from onemod.dtypes import Data, UniqueList from onemod.fsutils import DataInterface from onemod.io import Input, Output +from onemod.utils.custom_classes import get_custom_config_class from onemod.utils.decorators import computed_property from onemod.validation import ValidationErrorCollector, handle_error @@ -90,14 +90,13 @@ def module(self) -> Path | None: def set_module(self, module: Path | str | None) -> None: if isinstance(module, (Path, str)): self._module = Path(module) - else: - if not hasattr(onemod_stages, self.type): - try: - self._module = Path(getfile(self.__class__)) - except (OSError, TypeError): - raise TypeError( - f"Could not find module for custom stage '{self.name}'" - ) + elif not hasattr(onemod_stages, self.type): + try: + self._module = Path(getfile(self.__class__)) + except (OSError, TypeError): + raise TypeError( + f"Could not find module for custom stage '{self.name}'" + ) @computed_property def input(self) -> Input: @@ -372,7 +371,19 @@ def from_json(cls, config_path: Path | str, stage_name: str) -> Stage: ) stage = cls(config_path=config_path, **stage_config) - stage.config.add_pipeline_config(pipeline_config["config"]) + + if (pipeline_module := pipeline_config.get("module")) is None: + stage.config.add_pipeline_config( + Config(**pipeline_config["config"]) + ) + else: + config_class = get_custom_config_class( + pipeline_config["type"], pipeline_module + ) + stage.config.add_pipeline_config( + config_class(**pipeline_config["config"]) + ) + return stage def build( diff --git a/src/onemod/utils/custom_classes.py b/src/onemod/utils/custom_classes.py new file mode 100644 index 00000000..a697c1aa --- /dev/null +++ b/src/onemod/utils/custom_classes.py @@ -0,0 +1,83 @@ +"""Helper functions for loading custom classes from modules.""" + +from importlib.util import module_from_spec, spec_from_file_location +from inspect import getmodulename +from pathlib import Path +from types import ModuleType + +from pydantic import BaseModel + + +def get_custom_class(class_name: str, module: str) -> type[BaseModel]: + """Get custom pipeline, stage, or config class from file. + + Parameters + ---------- + class_name : str + Name of custom class. + module : str + Path to Python module containing custom class definition. + + Returns + ------- + BaseModel + Custom pipeline, stage, or config class. + + """ + loaded_module = load_module(module) + return getattr(loaded_module, class_name) + + +def get_custom_config_class(class_name: str, module: str) -> type[BaseModel]: + """Get custom pipeline config class from file. + + Parameters + ---------- + class_name : str + Name of custom pipeline class. + module : str + Path to Python module containing custom pipeline class + definition. + + Returns + ------- + BaseModel + Custom pipeline config class from file. + + """ + pipeline_class = get_custom_class(class_name, module) + config_class = pipeline_class.__pydantic_fields__["config"].annotation + return config_class + + +def load_module(module: str) -> ModuleType: + """Load Python module from file path. + + Parameters + ---------- + module : str + Path to Python module. + + Returns + ------- + ModuleType + Loaded Python module. + + """ + module_path = Path(module) + + module_name = getmodulename(module_path) + if module_name is None: + raise ValueError(f"Could not determine module name from {module_path}") + + spec = spec_from_file_location(module_name, module_path) + if spec is None: + raise ImportError(f"Could not load spec for module {module_path}") + + if spec.loader is None: + raise ImportError(f"Module spec for {module_path} has no loader") + + loaded_module = module_from_spec(spec) + spec.loader.exec_module(loaded_module) + + return loaded_module diff --git a/tests/e2e/test_e2e_jobmon_backend.py b/tests/e2e/test_e2e_jobmon_backend.py index dff27e13..4c82502b 100644 --- a/tests/e2e/test_e2e_jobmon_backend.py +++ b/tests/e2e/test_e2e_jobmon_backend.py @@ -9,11 +9,21 @@ import pytest +try: + from jobmon.client.api import Tool + + from onemod.backend.jobmon_backend import add_tasks_to_workflow +except ImportError: + pass + + KWARGS = { "backend": "jobmon", "cluster": "dummy", "resources": {"tool_resources": {"dummy": {"queue": "null.q"}}}, "python": None, + "task_and_template_prefix": "jobmon_e2e_testing", + "max_attempts": 3, } STAGE_KWARGS = {**KWARGS, "subsets": None, "paramsets": None, "collect": None} @@ -113,3 +123,39 @@ def test_parallel_stage_submodels(parallel_pipeline, submodel, collect): "collect": collect, }, ) + + +@pytest.mark.e2e +@pytest.mark.requires_jobmon +def test_simple_pipeline_add_tasks_to_workflow(simple_pipeline): + tool = Tool(name="test_run_simple_pipeline") + tool.set_default_cluster_name("dummy") + tool.set_default_compute_resources_from_dict("dummy", {"queue": "null.q"}) + workflow = tool.create_workflow(name="test_run_workflow") + add_tasks_to_workflow( + model=simple_pipeline, + workflow=workflow, + method="run", + stages=["run_1", "fit_2"], + **KWARGS, + ) + workflow.bind() + workflow.run() + + +@pytest.mark.e2e +@pytest.mark.requires_jobmon +def test_parallel_pipeline_add_tasks_to_workflow(parallel_pipeline): + tool = Tool(name="test_run_parallel_pipeline") + tool.set_default_cluster_name("dummy") + tool.set_default_compute_resources_from_dict("dummy", {"queue": "null.q"}) + workflow = tool.create_workflow(name="test_run_workflow") + add_tasks_to_workflow( + model=parallel_pipeline, + workflow=workflow, + method="run", + stages=["run_1", "fit_2"], + **KWARGS, + ) + workflow.bind() + workflow.run() diff --git a/tests/integration/test_integration_jobmon_backend.py b/tests/integration/test_integration_jobmon_backend.py index 5b4602c5..e3d4d219 100644 --- a/tests/integration/test_integration_jobmon_backend.py +++ b/tests/integration/test_integration_jobmon_backend.py @@ -2,6 +2,7 @@ from collections import defaultdict from pathlib import Path +from unittest import mock import pytest @@ -115,7 +116,12 @@ def test_task_template(stage_cluster): } tool = jb.get_tool("pipeline", "method", "cluster", resources) task_template = jb.get_task_template( - "stage", "method", tool, resources, submodel_args=[] + "stage", + "method", + tool, + resources, + submodel_args=[], + task_and_template_prefix=None, ) default_cluster = task_template.default_cluster_name default_resources = task_template.default_compute_resources_set @@ -211,6 +217,9 @@ def test_simple_pipeline_tasks(simple_pipeline, method, stages): resources=resources, python=python, stages=stages, + external_upstream_tasks=None, + task_and_template_prefix=None, + max_attempts=1, ) stages = list(simple_pipeline.stages.keys()) if stages is None else stages task_dict = {task.task_args["stages"]: task for task in tasks} @@ -243,6 +252,9 @@ def test_parallel_pipeline_tasks(parallel_pipeline, method, stages): resources=resources, python=python, stages=stages, + external_upstream_tasks=None, + task_and_template_prefix=None, + max_attempts=1, ) stages = list(parallel_pipeline.stages.keys()) if stages is None else stages task_dict = {task.task_args["stages"]: defaultdict(list) for task in tasks} @@ -287,6 +299,91 @@ def test_parallel_pipeline_tasks(parallel_pipeline, method, stages): assert stage.name not in task_dict +@pytest.mark.integration +@pytest.mark.requires_jobmon +@pytest.mark.parametrize("method", ["run", "fit", "predict"]) +@pytest.mark.parametrize("stages", [None, ["run_1", "fit_2", "predict_3"]]) +def test_parallel_pipeline_tasks_jobmon_args(parallel_pipeline, method, stages): + cluster = "cluster" + resources = {"tool_resources": {cluster: {"queue": "null.q"}}} + python = "/path/to/python/env/bin/python" + external_upstream_tasks = [ + jb.Task( + node=mock.MagicMock(), + task_args={"fake_arg_1": "fake_value"}, + op_args={"fake_arg_2": "fake_value"}, + name="fake_task", + task_attributes=[], + ) + ] + task_and_template_prefix = "testing" + max_attempts = 3 + tasks = jb.get_pipeline_tasks( + parallel_pipeline, + method, + jb.get_tool(parallel_pipeline.name, method, cluster, resources), + resources=resources, + python=python, + stages=stages, + external_upstream_tasks=external_upstream_tasks, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + ) + stages = list(parallel_pipeline.stages.keys()) if stages is None else stages + task_dict = {task.task_args["stages"]: defaultdict(list) for task in tasks} + for task in tasks: + task_dict[task.task_args["stages"]][task.task_args["method"]].append( + task + ) + + for stage in parallel_pipeline.stages.values(): + if stage.name in stages: + method_tasks = task_dict[stage.name][method] + collect_tasks = task_dict[stage.name]["collect"] + assert len(method_tasks) == len(stage.get_submodels()) + + if method in stage.collect_after: + assert len(collect_tasks) == 1 + assert collect_tasks[0].upstream_tasks == set(method_tasks) + else: + assert len(collect_tasks) == 0 + + for task in method_tasks: + stage_upstreams = [ + upstream_task + for upstream_task in task.upstream_tasks + if "stages" in upstream_task.task_args + ] + external_upstreams = [ + upstream_task + for upstream_task in task.upstream_tasks + if "stages" not in upstream_task.task_args + ] + upstream_dict = { + upstream_task.task_args["stages"]: defaultdict(list) + for upstream_task in stage_upstreams + } + for upstream_task in stage_upstreams: + upstream_dict[upstream_task.task_args["stages"]][ + upstream_task.task_args["method"] + ].append(upstream_task) + for upstream_name in stage.dependencies: + # assumes upstream_stage in stages + upstream_stage = parallel_pipeline.stages[upstream_name] + if method in upstream_stage.collect_after: + assert len(upstream_dict[upstream_name][method]) == 0 + assert len(upstream_dict[upstream_name]["collect"]) == 1 + else: + assert len(upstream_dict[upstream_name][method]) == len( + upstream_stage.get_submodels() + ) + assert len(upstream_dict[upstream_name]["collect"]) == 0 + if external_upstreams: + assert external_upstreams == external_upstream_tasks + else: + assert stage.name not in task_dict + + @pytest.mark.integration @pytest.mark.requires_jobmon def test_stage_tasks_basic(simple_pipeline): @@ -303,6 +400,8 @@ def test_stage_tasks_basic(simple_pipeline): jb.get_tool(simple_pipeline.name, method, cluster, resources), resources=resources, python=python, + task_and_template_prefix=None, + max_attempts=1, ) task = tasks[0] @@ -341,6 +440,8 @@ def test_stage_tasks_kwargs(simple_pipeline, kwargs): jb.get_tool(simple_pipeline.name, method, cluster, resources), resources=resources, python=python, + task_and_template_prefix=None, + max_attempts=1, **kwargs, )[0] assert task.command == jb.get_command_template(method, [], **kwargs).format( @@ -382,6 +483,8 @@ def test_stage_tasks_submodels(parallel_pipeline, submodel, collect): subsets=subsets, paramsets=paramsets, collect=collect, + task_and_template_prefix=None, + max_attempts=1, ) submodels = [ [str(submodel[0]), str(submodel[1])] @@ -423,6 +526,8 @@ def test_stage_tasks_collect_after(parallel_pipeline, method): subsets={"sex_id": 1}, paramsets={"param": 1}, collect=True, + task_and_template_prefix=None, + max_attempts=1, ) assert tasks[0].task_args["method"] == method if method == "predict": @@ -430,3 +535,43 @@ def test_stage_tasks_collect_after(parallel_pipeline, method): else: assert len(tasks) == 2 assert tasks[1].task_args["method"] == "collect" + + +@pytest.mark.integration +@pytest.mark.requires_jobmon +def test_stage_tasks_jobmon_args(simple_pipeline): + stage = simple_pipeline.stages["run_1"] + method = "run" + cluster = "cluster" + resources = {"tool_resources": {cluster: {"queue": "null.q"}}} + python = "/path/to/python/env/bin/python" + task_and_template_prefix = "testing" + max_attempts = 3 + entrypoint = str(Path(python).parent / "onemod") + config = str(stage.dataif.get_path("config")) + tasks = jb.get_stage_tasks( + stage, + method, + jb.get_tool(simple_pipeline.name, method, cluster, resources), + resources=resources, + python=python, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + ) + task = tasks[0] + + assert len(tasks) == 1 + assert task.name == f"{task_and_template_prefix}_{stage.name}_{method}" + assert task.cluster_name == "" + assert task.compute_resources == {} + assert task.command == jb.get_command_template(method, []).format( + entrypoint=entrypoint, config=config, method=method, stages=stage.name + ) + assert task.max_attempts == max_attempts + assert task.op_args == {"entrypoint": entrypoint} + assert task.task_args == { + "config": config, + "method": method, + "stages": stage.name, + } + assert task.node.node_args == {} diff --git a/tests/integration/test_integration_pipeline_build.py b/tests/integration/test_integration_pipeline_build.py index ca2a8c30..8d5707cb 100644 --- a/tests/integration/test_integration_pipeline_build.py +++ b/tests/integration/test_integration_pipeline_build.py @@ -219,6 +219,7 @@ def test_pipeline_build_single_stage(test_base_dir, pipeline_with_single_stage): "id_columns": ["age_group_id", "location_id"], "model_type": "binomial", }, + "type": "Pipeline", "stages": { "stage_1": { "name": "stage_1", diff --git a/tests/unit/config/test_stage_config.py b/tests/unit/config/test_stage_config.py index 4ae3192a..1edbc42d 100644 --- a/tests/unit/config/test_stage_config.py +++ b/tests/unit/config/test_stage_config.py @@ -21,15 +21,11 @@ def stage_config(pipeline_config): return stage_config -@pytest.mark.parametrize("from_config", [True, False]) -def test_pipeline_config(pipeline_config, from_config): +def test_pipeline_config(pipeline_config): stage_config = StageConfig( stage_key="stage_value", shared_key="stage_shared_value" ) - if from_config: - stage_config.add_pipeline_config(pipeline_config) - else: # from dictionary - stage_config.add_pipeline_config(pipeline_config.model_dump()) + stage_config.add_pipeline_config(pipeline_config) assert isinstance(stage_config._pipeline_config, Config) assert stage_config._pipeline_config["pipeline_key"] == "pipeline_value" assert (