diff --git a/payu/cli.py b/payu/cli.py index c2997219..b035c9d5 100644 --- a/payu/cli.py +++ b/payu/cli.py @@ -167,7 +167,6 @@ def submit_job(script, config, vars=None): sched_name = config.get('scheduler', DEFAULT_SCHEDULER_CONFIG) sched_type = scheduler_index[sched_name] sched = sched_type() - cmd = sched.submit(script, config, vars) - print(cmd) - - subprocess.check_call(shlex.split(cmd)) + job = sched.submit(script, config, vars) + print(job.id) + return job diff --git a/payu/schedulers/payu-submit.sh b/payu/schedulers/payu-submit.sh new file mode 100644 index 00000000..e7adae55 --- /dev/null +++ b/payu/schedulers/payu-submit.sh @@ -0,0 +1,3 @@ +#!/bin/bash +# Script to run payu command on a scheduled job, e.g. path/to/python path/to/payu-run +{{python_exe}} {{payu_exe}} \ No newline at end of file diff --git a/payu/schedulers/pbs.py b/payu/schedulers/pbs.py index adcee9b5..d0133b2e 100644 --- a/payu/schedulers/pbs.py +++ b/payu/schedulers/pbs.py @@ -13,36 +13,33 @@ import subprocess from typing import Any, Dict, Optional +from tenacity import retry, stop_after_delay +from hpcpy import PBSClient + import payu.envmod as envmod from payu.fsops import check_exe_path from payu.manifest import Manifest -from payu.schedulers.scheduler import Scheduler - -from tenacity import retry, stop_after_delay - +from payu.schedulers.scheduler import Scheduler, JOB_SCRIPT_TEMPLATE # TODO: This is a stub acting as a minimal port to a Scheduler class. class PBS(Scheduler): # TODO: __init__ - def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): - """Prepare a correct PBS command string""" + def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None, + dry_run=False): + """Submit a job using HPCpy PBS client""" - pbs_env_init() + # TODO: Is pbs_env_init() still required? + # pbs_env_init() # Initialisation if pbs_vars is None: pbs_vars = {} - # Necessary for testing if python_exe is None: python_exe = sys.executable pbs_flags = [] - - pbs_queue = pbs_config.get('queue', 'normal') - pbs_flags.append('-q {queue}'.format(queue=pbs_queue)) - pbs_project = pbs_config.get('project', os.environ['PROJECT']) pbs_flags.append('-P {project}'.format(project=pbs_project)) @@ -77,12 +74,6 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): else: pbs_flags.append('-j {join}'.format(join=pbs_join)) - # Append environment variables to qsub command - # TODO: Support full export of environment variables: `qsub -V` - pbs_vstring = ','.join('{0}={1}'.format(k, v) - for k, v in pbs_vars.items()) - pbs_flags.append('-v ' + pbs_vstring) - storages = set() storage_config = pbs_config.get('storage', {}) mounts = set(['/scratch', '/g/data']) @@ -120,15 +111,13 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): storages.update(find_mounts(extra_search_paths, mounts)) storages.update(find_mounts(get_manifest_paths(), mounts)) - # Add storage flags. Note that these are sorted to get predictable - # behaviour for testing - pbs_flags_extend = '+'.join(sorted(storages)) - if pbs_flags_extend: - pbs_flags.append("-l storage={}".format(pbs_flags_extend)) + # Sort the storages for testing + storages = sorted(list(storages)) + # TODO: Is this still needed? # Set up environment modules here for PBS. - envmod.setup() - envmod.module('load', 'pbs') + # envmod.setup() + # envmod.module('load', 'pbs') # Check for custom container launcher script environment variable launcher_script = os.environ.get('ENV_LAUNCHER_SCRIPT_PATH') @@ -141,14 +130,20 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): # so the python executable is accessible in the container python_exe = f'{launcher_script} {python_exe}' - # Construct job submission command - cmd = 'qsub {flags} -- {python} {script}'.format( - flags=' '.join(pbs_flags), - python=python_exe, - script=pbs_script + client = PBSClient() + job = client.submit( + dry_run=dry_run, + directives=pbs_flags, + queue=pbs_config.get("queue", "normal"), + variables=pbs_vars, + storage=storages, + job_script=JOB_SCRIPT_TEMPLATE, + render=True, + python_exe=python_exe, + payu_exe=pbs_script, ) + return job - return cmd def get_job_id(self, short: bool = True) -> Optional[str]: """Get PBS job ID diff --git a/payu/schedulers/scheduler.py b/payu/schedulers/scheduler.py index 83f0b205..82d86e24 100644 --- a/payu/schedulers/scheduler.py +++ b/payu/schedulers/scheduler.py @@ -7,10 +7,12 @@ # TODO: This class is currently just a stub. I would hope that it will be # expanded to provide greater functionality in the future. - +from pathlib import Path from typing import Any, Dict, Optional +JOB_SCRIPT_TEMPLATE = Path(__file__).parent / "payu-submit.sh" + class Scheduler(object): """Abstract scheduler class.""" diff --git a/payu/schedulers/slurm.py b/payu/schedulers/slurm.py index 608744ed..f64777ce 100644 --- a/payu/schedulers/slurm.py +++ b/payu/schedulers/slurm.py @@ -11,14 +11,16 @@ import shlex import subprocess -from payu.fsops import check_exe_path -from payu.schedulers.scheduler import Scheduler +from hpcpy import SlurmClient +from payu.fsops import check_exe_path +from payu.schedulers.scheduler import Scheduler, JOB_SCRIPT_TEMPLATE class Slurm(Scheduler): # TODO: __init__ - def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): + def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None, + storages=None, dry_run=False): """Prepare a correct PBS command string""" if python_exe is None: @@ -38,11 +40,13 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None): pbs_flags.append('--qos=debug') pbs_flags.append('--cluster=c4') - # Construct job submission command - cmd = 'sbatch {flags} --wrap="{python} {script}"'.format( - flags=' '.join(pbs_flags), - python=python_exe, - script=pbs_script + client = SlurmClient() + job = client.submit( + dry_run=dry_run, + directives=pbs_flags, + job_script=JOB_SCRIPT_TEMPLATE, + render=True, + python_exe=python_exe, + payu_exe=pbs_script, ) - - return cmd + return job diff --git a/pyproject.toml b/pyproject.toml index 346a05c2..e20d97be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "ruamel.yaml >=0.18.5", "packaging", "netCDF4", + "hpcpy" ] [project.optional-dependencies] diff --git a/test/test_pbs.py b/test/test_pbs.py index 2637b20e..2048f069 100644 --- a/test/test_pbs.py +++ b/test/test_pbs.py @@ -175,7 +175,7 @@ def test_run(): config['modules'] = {} config['modules']['use'] = ['/f/data/mm01', '/f/data/mm02/test/modules'] - cmd = sched.submit(payu_cmd, config, pbs_vars, python_exe) + cmd = sched.submit(payu_cmd, config, pbs_vars, python_exe, dry_run=True) print(cmd) @@ -187,7 +187,7 @@ def test_run(): parser.add_argument('-P', type=str, required=True) parser.add_argument('-N', type=str, required=True) parser.add_argument('-v', metavar='KEY-VALUE', - nargs='+', required=True) + nargs=1, required=True) # Assuming one key-value pair parser.add_argument('-j', type=str, required=True) parser.add_argument('-l', metavar='KEY=VALUE', nargs='+', action='append', required=True) @@ -227,14 +227,19 @@ def test_run(): env = {} for env_var in args.v: - k, v = env_var.split('=') - env[k] = v + if '=' in env_var: + k, v = env_var.split('=') + env[k] = v assert('PAYU_PATH' in env) assert(env['PAYU_PATH'] == str(payu_path)) - assert(args.remaining[-2].endswith('python')) - assert(args.remaining[-1].endswith(payu_cmd)) + script_path = args.remaining[-1] + with open(script_path, 'r') as f: + script_content = f.readlines() + assert script_content[0] == '#!/bin/bash\n' + assert script_content[-1].split()[-2].endswith('python') + assert script_content[-1].split()[-1].endswith(payu_cmd) @patch("payu.schedulers.pbs.pbs_env_init", return_value=True) @@ -272,10 +277,12 @@ def test_submit_launcher_script_setting( # Generate the qsub command pbs_cmd = pbs.PBS().submit("payu-run", config, - python_exe="/path/to/python") - - _, cmd = pbs_cmd.split("--") - assert cmd.strip() == expected_cmd.format(tmp_path=tmp_path) + python_exe="/path/to/python", dry_run=True) + script_path = pbs_cmd.split()[-1] + with open(script_path, 'r') as f: + script_content = f.readlines() + assert script_content[0] == '#!/bin/bash\n' + assert script_content[-1] == expected_cmd.format(tmp_path=tmp_path) def test_tenacity():