Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions payu/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def submit_job(script, config, vars=None):
sched_name = config.get('scheduler', DEFAULT_SCHEDULER_CONFIG)
sched_type = scheduler_index[sched_name]
sched = sched_type()
cmd = sched.submit(script, config, vars)
print(cmd)

subprocess.check_call(shlex.split(cmd))
job = sched.submit(script, config, vars)
print(job.id)
return job
3 changes: 3 additions & 0 deletions payu/schedulers/payu-submit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
# Script to run payu command on a scheduled job, e.g. path/to/python path/to/payu-run
{{python_exe}} {{payu_exe}}
57 changes: 26 additions & 31 deletions payu/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,33 @@
import subprocess
from typing import Any, Dict, Optional

from tenacity import retry, stop_after_delay
from hpcpy import PBSClient

import payu.envmod as envmod
from payu.fsops import check_exe_path
from payu.manifest import Manifest
from payu.schedulers.scheduler import Scheduler

from tenacity import retry, stop_after_delay

from payu.schedulers.scheduler import Scheduler, JOB_SCRIPT_TEMPLATE

# TODO: This is a stub acting as a minimal port to a Scheduler class.
class PBS(Scheduler):
# TODO: __init__

def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
"""Prepare a correct PBS command string"""
def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None,
dry_run=False):
"""Submit a job using HPCpy PBS client"""

pbs_env_init()
# TODO: Is pbs_env_init() still required?
# pbs_env_init()

# Initialisation
if pbs_vars is None:
pbs_vars = {}

# Necessary for testing
if python_exe is None:
python_exe = sys.executable

pbs_flags = []

pbs_queue = pbs_config.get('queue', 'normal')
pbs_flags.append('-q {queue}'.format(queue=pbs_queue))

pbs_project = pbs_config.get('project', os.environ['PROJECT'])
pbs_flags.append('-P {project}'.format(project=pbs_project))

Expand Down Expand Up @@ -77,12 +74,6 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
else:
pbs_flags.append('-j {join}'.format(join=pbs_join))

# Append environment variables to qsub command
# TODO: Support full export of environment variables: `qsub -V`
pbs_vstring = ','.join('{0}={1}'.format(k, v)
for k, v in pbs_vars.items())
pbs_flags.append('-v ' + pbs_vstring)

storages = set()
storage_config = pbs_config.get('storage', {})
mounts = set(['/scratch', '/g/data'])
Expand Down Expand Up @@ -120,15 +111,13 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
storages.update(find_mounts(extra_search_paths, mounts))
storages.update(find_mounts(get_manifest_paths(), mounts))

# Add storage flags. Note that these are sorted to get predictable
# behaviour for testing
pbs_flags_extend = '+'.join(sorted(storages))
if pbs_flags_extend:
pbs_flags.append("-l storage={}".format(pbs_flags_extend))
# Sort the storages for testing
storages = sorted(list(storages))

# TODO: Is this still needed?
# Set up environment modules here for PBS.
envmod.setup()
envmod.module('load', 'pbs')
# envmod.setup()
# envmod.module('load', 'pbs')

# Check for custom container launcher script environment variable
launcher_script = os.environ.get('ENV_LAUNCHER_SCRIPT_PATH')
Expand All @@ -141,14 +130,20 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
# so the python executable is accessible in the container
python_exe = f'{launcher_script} {python_exe}'

# Construct job submission command
cmd = 'qsub {flags} -- {python} {script}'.format(
flags=' '.join(pbs_flags),
python=python_exe,
script=pbs_script
client = PBSClient()
job = client.submit(
dry_run=dry_run,
directives=pbs_flags,
queue=pbs_config.get("queue", "normal"),
variables=pbs_vars,
storage=storages,
job_script=JOB_SCRIPT_TEMPLATE,
render=True,
python_exe=python_exe,
payu_exe=pbs_script,
)
return job

return cmd

def get_job_id(self, short: bool = True) -> Optional[str]:
"""Get PBS job ID
Expand Down
4 changes: 3 additions & 1 deletion payu/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
# TODO: This class is currently just a stub. I would hope that it will be
# expanded to provide greater functionality in the future.


from pathlib import Path
from typing import Any, Dict, Optional


JOB_SCRIPT_TEMPLATE = Path(__file__).parent / "payu-submit.sh"

class Scheduler(object):
"""Abstract scheduler class."""

Expand Down
24 changes: 14 additions & 10 deletions payu/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
import shlex
import subprocess

from payu.fsops import check_exe_path
from payu.schedulers.scheduler import Scheduler
from hpcpy import SlurmClient

from payu.fsops import check_exe_path
from payu.schedulers.scheduler import Scheduler, JOB_SCRIPT_TEMPLATE

class Slurm(Scheduler):
# TODO: __init__

def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None,
storages=None, dry_run=False):
"""Prepare a correct PBS command string"""

if python_exe is None:
Expand All @@ -38,11 +40,13 @@ def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
pbs_flags.append('--qos=debug')
pbs_flags.append('--cluster=c4')

# Construct job submission command
cmd = 'sbatch {flags} --wrap="{python} {script}"'.format(
flags=' '.join(pbs_flags),
python=python_exe,
script=pbs_script
client = SlurmClient()
job = client.submit(
dry_run=dry_run,
directives=pbs_flags,
job_script=JOB_SCRIPT_TEMPLATE,
render=True,
python_exe=python_exe,
payu_exe=pbs_script,
)

return cmd
return job
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies = [
"ruamel.yaml >=0.18.5",
"packaging",
"netCDF4",
"hpcpy"
]

[project.optional-dependencies]
Expand Down
27 changes: 17 additions & 10 deletions test/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def test_run():
config['modules'] = {}
config['modules']['use'] = ['/f/data/mm01', '/f/data/mm02/test/modules']

cmd = sched.submit(payu_cmd, config, pbs_vars, python_exe)
cmd = sched.submit(payu_cmd, config, pbs_vars, python_exe, dry_run=True)

print(cmd)

Expand All @@ -187,7 +187,7 @@ def test_run():
parser.add_argument('-P', type=str, required=True)
parser.add_argument('-N', type=str, required=True)
parser.add_argument('-v', metavar='KEY-VALUE',
nargs='+', required=True)
nargs=1, required=True) # Assuming one key-value pair
parser.add_argument('-j', type=str, required=True)
parser.add_argument('-l', metavar='KEY=VALUE',
nargs='+', action='append', required=True)
Expand Down Expand Up @@ -227,14 +227,19 @@ def test_run():

env = {}
for env_var in args.v:
k, v = env_var.split('=')
env[k] = v
if '=' in env_var:
k, v = env_var.split('=')
env[k] = v

assert('PAYU_PATH' in env)
assert(env['PAYU_PATH'] == str(payu_path))

assert(args.remaining[-2].endswith('python'))
assert(args.remaining[-1].endswith(payu_cmd))
script_path = args.remaining[-1]
with open(script_path, 'r') as f:
script_content = f.readlines()
assert script_content[0] == '#!/bin/bash\n'
assert script_content[-1].split()[-2].endswith('python')
assert script_content[-1].split()[-1].endswith(payu_cmd)


@patch("payu.schedulers.pbs.pbs_env_init", return_value=True)
Expand Down Expand Up @@ -272,10 +277,12 @@ def test_submit_launcher_script_setting(

# Generate the qsub command
pbs_cmd = pbs.PBS().submit("payu-run", config,
python_exe="/path/to/python")

_, cmd = pbs_cmd.split("--")
assert cmd.strip() == expected_cmd.format(tmp_path=tmp_path)
python_exe="/path/to/python", dry_run=True)
script_path = pbs_cmd.split()[-1]
with open(script_path, 'r') as f:
script_content = f.readlines()
assert script_content[0] == '#!/bin/bash\n'
assert script_content[-1] == expected_cmd.format(tmp_path=tmp_path)


def test_tenacity():
Expand Down
Loading