|
| 1 | +from dataclasses import dataclass, field |
| 2 | +from typing import List, Generator, Optional |
| 3 | +from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo |
| 4 | +from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor |
| 5 | +from snakemake_interface_executor_plugins.settings import ( |
| 6 | + ExecutorSettingsBase, |
| 7 | + CommonSettings, |
| 8 | +) |
| 9 | +from snakemake_interface_executor_plugins.jobs import ( |
| 10 | + JobExecutorInterface, |
| 11 | +) |
| 12 | +from snakemake_interface_common.exceptions import WorkflowError # noqa |
| 13 | + |
| 14 | + |
| 15 | +# Optional: |
| 16 | +# Define additional settings for your executor. |
| 17 | +# They will occur in the Snakemake CLI as --<executor-name>-<param-name> |
| 18 | +# Omit this class if you don't need any. |
| 19 | +# Make sure that all defined fields are Optional and specify a default value |
| 20 | +# of None or anything else that makes sense in your case. |
| 21 | +@dataclass |
| 22 | +class ExecutorSettings(ExecutorSettingsBase): |
| 23 | + myparam: Optional[int] = field( |
| 24 | + default=None, |
| 25 | + metadata={ |
| 26 | + "help": "Some help text", |
| 27 | + # Optionally request that setting is also available for specification |
| 28 | + # via an environment variable. The variable will be named automatically as |
| 29 | + # SNAKEMAKE_<executor-name>_<param-name>, all upper case. |
| 30 | + # This mechanism should only be used for passwords and usernames. |
| 31 | + # For other items, we rather recommend to let people use a profile |
| 32 | + # for setting defaults |
| 33 | + # (https://snakemake.readthedocs.io/en/stable/executing/cli.html#profiles). |
| 34 | + "env_var": False, |
| 35 | + # Optionally specify a function that parses the value given by the user. |
| 36 | + # This is useful to create complex types from the user input. |
| 37 | + "parse_func": ..., |
| 38 | + # If a parse_func is specified, you also have to specify an unparse_func |
| 39 | + # that converts the parsed value back to a string. |
| 40 | + "unparse_func": ..., |
| 41 | + # Optionally specify that setting is required when the executor is in use. |
| 42 | + "required": True, |
| 43 | + }, |
| 44 | + ) |
| 45 | + |
| 46 | + |
| 47 | +# Required: |
| 48 | +# Specify common settings shared by various executors. |
| 49 | +common_settings = CommonSettings( |
| 50 | + # define whether your executor plugin executes locally |
| 51 | + # or remotely. In virtually all cases, it will be remote execution |
| 52 | + # (cluster, cloud, etc.). Only Snakemake's standard execution |
| 53 | + # plugins (snakemake-executor-plugin-dryrun, snakemake-executor-plugin-local) |
| 54 | + # are expected to specify False here. |
| 55 | + non_local_exec=True, |
| 56 | + # Whether the executor implies to not have a shared file system |
| 57 | + implies_no_shared_fs=True, |
| 58 | + # whether to deploy workflow sources to default storage provider before execution |
| 59 | + job_deploy_sources=True, |
| 60 | + # whether arguments for setting the storage provider shall be passed to jobs |
| 61 | + pass_default_storage_provider_args=True, |
| 62 | + # whether arguments for setting default resources shall be passed to jobs |
| 63 | + pass_default_resources_args=True, |
| 64 | + # whether environment variables shall be passed to jobs (if False, use |
| 65 | + # self.envvars() to obtain a dict of environment variables and their values |
| 66 | + # and pass them e.g. as secrets to the execution backend) |
| 67 | + pass_envvar_declarations_to_cmd=True, |
| 68 | + # whether the default storage provider shall be deployed before the job is run on |
| 69 | + # the remote node. Usually set to True if the executor does not assume a shared fs |
| 70 | + auto_deploy_default_storage_provider=True, |
| 71 | + # specify initial amount of seconds to sleep before checking for job status |
| 72 | + init_seconds_before_status_checks=0, |
| 73 | +) |
| 74 | + |
| 75 | + |
| 76 | +# Required: |
| 77 | +# Implementation of your executor |
| 78 | +class Executor(RemoteExecutor): |
| 79 | + def __post_init__(self): |
| 80 | + # access workflow |
| 81 | + self.workflow |
| 82 | + # access executor specific settings |
| 83 | + self.workflow.executor_settings |
| 84 | + |
| 85 | + # IMPORTANT: in your plugin, only access methods and properties of |
| 86 | + # Snakemake objects (like Workflow, Persistence, etc.) that are |
| 87 | + # defined in the interfaces found in the |
| 88 | + # snakemake-interface-executor-plugins and the |
| 89 | + # snakemake-interface-common package. |
| 90 | + # Other parts of those objects are NOT guaranteed to remain |
| 91 | + # stable across new releases. |
| 92 | + |
| 93 | + # To ensure that the used interfaces are not changing, you should |
| 94 | + # depend on these packages as >=a.b.c,<d with d=a+1 (i.e. pin the |
| 95 | + # dependency on this package to be at least the version at time |
| 96 | + # of development and less than the next major version which would |
| 97 | + # introduce breaking changes). |
| 98 | + |
| 99 | + # In case of errors outside of jobs, please raise a WorkflowError |
| 100 | + |
| 101 | + def run_job(self, job: JobExecutorInterface): |
| 102 | + # Implement here how to run a job. |
| 103 | + # You can access the job's resources, etc. |
| 104 | + # via the job object. |
| 105 | + # After submitting the job, you have to call |
| 106 | + # self.report_job_submission(job_info). |
| 107 | + # with job_info being of type |
| 108 | + # snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo. |
| 109 | + # If required, make sure to pass the job's id to the job_info object, as keyword |
| 110 | + # argument 'external_job_id'. |
| 111 | + |
| 112 | + ... |
| 113 | + |
| 114 | + async def check_active_jobs( |
| 115 | + self, active_jobs: List[SubmittedJobInfo] |
| 116 | + ) -> Generator[SubmittedJobInfo, None, None]: |
| 117 | + # Check the status of active jobs. |
| 118 | + |
| 119 | + # You have to iterate over the given list active_jobs. |
| 120 | + # If you provided it above, each will have its external_jobid set according |
| 121 | + # to the information you provided at submission time. |
| 122 | + # For jobs that have finished successfully, you have to call |
| 123 | + # self.report_job_success(active_job). |
| 124 | + # For jobs that have errored, you have to call |
| 125 | + # self.report_job_error(active_job). |
| 126 | + # This will also take care of providing a proper error message. |
| 127 | + # Usually there is no need to perform additional logging here. |
| 128 | + # Jobs that are still running have to be yielded. |
| 129 | + # |
| 130 | + # For queries to the remote middleware, please use |
| 131 | + # self.status_rate_limiter like this: |
| 132 | + # |
| 133 | + # async with self.status_rate_limiter: |
| 134 | + # # query remote middleware here |
| 135 | + # |
| 136 | + # To modify the time until the next call of this method, |
| 137 | + # you can set self.next_sleep_seconds here. |
| 138 | + ... |
| 139 | + |
| 140 | + def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]): |
| 141 | + # Cancel all active jobs. |
| 142 | + # This method is called when Snakemake is interrupted. |
| 143 | + ... |
0 commit comments