|
1 | | -# Writing custom runners |
| 1 | +# Writing Custom Runners |
| 2 | + |
| 3 | +While the official runners cover most common use cases, you might need to create your own custom runner for specific requirements - like running commands on a remote cluster, implementing custom logging, or handling specialized file systems. |
| 4 | + |
| 5 | +## Understanding the Runner Protocol |
| 6 | + |
| 7 | +Custom runners need to implement two main protocols: `Runner` and `Execution`. Think of the `Runner` as a factory that creates `Execution` objects, and each `Execution` handles a single command run. |
| 8 | + |
| 9 | +### The Runner Protocol |
| 10 | + |
| 11 | +Your runner class needs to implement just one method: |
| 12 | + |
| 13 | +```python |
| 14 | +def start_execution(self, metadata: Metadata) -> Execution: |
| 15 | + """Start an execution for a specific tool. |
| 16 | + |
| 17 | + Args: |
| 18 | + metadata: Information about the tool being executed (name, package, etc.) |
| 19 | + |
| 20 | + Returns: |
| 21 | + An Execution object that will handle the actual command execution |
| 22 | + """ |
| 23 | +``` |
| 24 | + |
| 25 | +### The Execution Protocol |
| 26 | + |
| 27 | +The `Execution` object does the heavy lifting with four key methods: |
| 28 | + |
| 29 | +```python |
| 30 | +def input_file(self, host_file: InputPathType, resolve_parent: bool = False, mutable: bool = False) -> str: |
| 31 | + """Handle input files - return where the command should find them""" |
| 32 | + |
| 33 | +def output_file(self, local_file: str, optional: bool = False) -> OutputPathType: |
| 34 | + """Handle output files - return where they'll be stored on the host""" |
| 35 | + |
| 36 | +def params(self, params: dict) -> dict: |
| 37 | + """Process or modify command parameters if needed""" |
| 38 | + |
| 39 | +def run(self, cargs: list[str], handle_stdout=None, handle_stderr=None) -> None: |
| 40 | + """Actually execute the command""" |
| 41 | +``` |
| 42 | + |
| 43 | +## Example: Simple Custom Runner |
| 44 | + |
| 45 | +Let's build a custom runner that adds some logging and stores outputs in timestamped directories: |
| 46 | + |
| 47 | +```python |
| 48 | +import pathlib |
| 49 | +import subprocess |
| 50 | +import logging |
| 51 | +from datetime import datetime |
| 52 | +from styxdefs import Runner, Execution, Metadata, InputPathType, OutputPathType |
| 53 | + |
| 54 | +class TimestampedExecution(Execution): |
| 55 | + def __init__(self, output_dir: pathlib.Path, metadata: Metadata): |
| 56 | + self.output_dir = output_dir |
| 57 | + self.metadata = metadata |
| 58 | + self.logger = logging.getLogger(f"custom_runner.{metadata.name}") |
| 59 | + |
| 60 | + # Create output directory |
| 61 | + self.output_dir.mkdir(parents=True, exist_ok=True) |
| 62 | + |
| 63 | + def input_file(self, host_file: InputPathType, resolve_parent: bool = False, mutable: bool = False) -> str: |
| 64 | + # For simplicity, just return the absolute path |
| 65 | + return str(pathlib.Path(host_file).absolute()) |
| 66 | + |
| 67 | + def output_file(self, local_file: str, optional: bool = False) -> OutputPathType: |
| 68 | + return self.output_dir / local_file |
| 69 | + |
| 70 | + def params(self, params: dict) -> dict: |
| 71 | + # Log parameters for debugging |
| 72 | + self.logger.info(f"Running {self.metadata.name} with params: {params}") |
| 73 | + return params |
| 74 | + |
| 75 | + def run(self, cargs: list[str], handle_stdout=None, handle_stderr=None) -> None: |
| 76 | + self.logger.info(f"Executing: {' '.join(cargs)}") |
| 77 | + |
| 78 | + # Run the command |
| 79 | + result = subprocess.run( |
| 80 | + cargs, |
| 81 | + cwd=self.output_dir, |
| 82 | + capture_output=True, |
| 83 | + text=True |
| 84 | + ) |
| 85 | + |
| 86 | + # Handle output |
| 87 | + if handle_stdout and result.stdout: |
| 88 | + for line in result.stdout.splitlines(): |
| 89 | + handle_stdout(line) |
| 90 | + if handle_stderr and result.stderr: |
| 91 | + for line in result.stderr.splitlines(): |
| 92 | + handle_stderr(line) |
| 93 | + |
| 94 | + if result.returncode != 0: |
| 95 | + raise RuntimeError(f"Command failed with return code {result.returncode}") |
| 96 | + |
| 97 | +class TimestampedRunner(Runner): |
| 98 | + def __init__(self, base_dir: str = "custom_outputs"): |
| 99 | + self.base_dir = pathlib.Path(base_dir) |
| 100 | + self.execution_counter = 0 |
| 101 | + |
| 102 | + def start_execution(self, metadata: Metadata) -> Execution: |
| 103 | + # Create timestamped directory |
| 104 | + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| 105 | + output_dir = self.base_dir / f"{timestamp}_{self.execution_counter}_{metadata.name}" |
| 106 | + self.execution_counter += 1 |
| 107 | + |
| 108 | + return TimestampedExecution(output_dir, metadata) |
| 109 | + |
| 110 | +# Usage |
| 111 | +from styxdefs import set_global_runner |
| 112 | + |
| 113 | +custom_runner = TimestampedRunner(base_dir="my_custom_outputs") |
| 114 | +set_global_runner(custom_runner) |
| 115 | +``` |
| 116 | + |
| 117 | +## Learning from Official Runners |
| 118 | + |
| 119 | +The best way to understand custom runners is to look at how the official ones work: |
| 120 | + |
| 121 | +### LocalRunner Pattern |
| 122 | +The `LocalRunner` is the simplest - it just runs commands directly on the host system. Notice how it: |
| 123 | +- Creates unique output directories using a hash and counter |
| 124 | +- Uses `subprocess.Popen` with threading for real-time output handling |
| 125 | +- Handles file paths by converting them to absolute paths |
| 126 | + |
| 127 | +### DockerRunner Pattern |
| 128 | +The `DockerRunner` shows more complex file handling: |
| 129 | +- Maps host files to container paths using Docker mounts |
| 130 | +- Creates a run script inside the container |
| 131 | +- Handles the complexity of translating between host and container file systems |
| 132 | + |
| 133 | +## Advanced Patterns |
| 134 | + |
| 135 | +### Middleware Runners |
| 136 | +You can create runners that wrap other runners to add functionality: |
| 137 | + |
| 138 | +```python |
| 139 | +class LoggingRunner(Runner): |
| 140 | + def __init__(self, wrapped_runner: Runner): |
| 141 | + self.wrapped_runner = wrapped_runner |
| 142 | + self.logger = logging.getLogger("logging_runner") |
| 143 | + |
| 144 | + def start_execution(self, metadata: Metadata) -> Execution: |
| 145 | + self.logger.info(f"Starting execution of {metadata.name}") |
| 146 | + execution = self.wrapped_runner.start_execution(metadata) |
| 147 | + return LoggingExecution(execution, self.logger) |
| 148 | + |
| 149 | +class LoggingExecution(Execution): |
| 150 | + def __init__(self, wrapped_execution: Execution, logger): |
| 151 | + self.wrapped = wrapped_execution |
| 152 | + self.logger = logger |
| 153 | + |
| 154 | + def run(self, cargs: list[str], handle_stdout=None, handle_stderr=None) -> None: |
| 155 | + self.logger.info(f"Running command: {' '.join(cargs)}") |
| 156 | + start_time = datetime.now() |
| 157 | + |
| 158 | + try: |
| 159 | + self.wrapped.run(cargs, handle_stdout, handle_stderr) |
| 160 | + duration = datetime.now() - start_time |
| 161 | + self.logger.info(f"Command completed in {duration}") |
| 162 | + except Exception as e: |
| 163 | + self.logger.error(f"Command failed: {e}") |
| 164 | + raise |
| 165 | + |
| 166 | + # Forward other methods to wrapped execution |
| 167 | + def input_file(self, *args, **kwargs): return self.wrapped.input_file(*args, **kwargs) |
| 168 | + def output_file(self, *args, **kwargs): return self.wrapped.output_file(*args, **kwargs) |
| 169 | + def params(self, *args, **kwargs): return self.wrapped.params(*args, **kwargs) |
| 170 | +``` |
| 171 | + |
| 172 | +### Remote Execution |
| 173 | +For cluster or remote execution, you might implement SSH-based runners, SLURM job submission, or cloud-based execution. |
| 174 | + |
| 175 | +## Tips for Custom Runners |
| 176 | + |
| 177 | +**File Handling**: Think carefully about where files live and how paths get translated. Input files need to be accessible to your execution environment, and output files need to end up where the user expects them. |
| 178 | + |
| 179 | +**Error Handling**: Always check return codes and provide meaningful error messages. Consider implementing custom exception types like `StyxDockerError` for better debugging. |
| 180 | + |
| 181 | +**Logging**: Good logging makes debugging much easier. Use structured logging with appropriate levels. |
| 182 | + |
| 183 | +**Threading**: If you need real-time output handling (like showing progress), consider using threading like the official runners do. |
| 184 | + |
| 185 | +**Testing**: Test your runner with various tools and edge cases. The `DryRunner` pattern is useful for testing command generation without actual execution. |
| 186 | + |
| 187 | +> [!TIP] |
| 188 | +> Start simple! Create a basic working runner first, then add advanced features like custom file handling or remote execution once you have the basics working. |
| 189 | +
|
| 190 | +> [!TIP] |
| 191 | +> Look at the source code of official runners for inspiration. They handle many edge cases you might not think of initially. |
0 commit comments