Skip to content

Commit 2804e9e

Browse files
authored
Merge pull request #9 from rokroskar/slurm
add support for SLURM
2 parents 83b498f + 984e640 commit 2804e9e

10 files changed

Lines changed: 390 additions & 195 deletions

File tree

.gitignore

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
*pyc
2+
*~
3+
*.log
4+
_build
5+
build
6+
.ipynb_checkpoints
7+
dist
8+
*egg-info
9+
job

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,16 +84,16 @@ Job <31463649> is being terminated
8484
### Python code
8585

8686
```python
87-
from sparkhpc.sparkjob import LSFSparkJob
87+
from sparkhpc import sparkjob
8888
import findspark
8989
findspark.init() # this sets up the paths required to find spark libraries
9090
import pyspark
9191

92-
sj = LSFSparkJob(ncores=10)
92+
sj = sparkjob.sparkjob(ncores=10)
9393

9494
sj.wait_to_start()
9595

96-
sc = pyspark.SparkContext(master=sj.master_url)
96+
sc = sj.start_spark()
9797

9898
sc.parallelize(...)
9999
```
@@ -125,7 +125,7 @@ Currently only LSF is supported. However, adding support for other schedulers is
125125

126126
To implement support for a new scheduler you should subclass `SparkCluster`. You must define the following *class* variables:
127127

128-
* `_peek_command` (command to get stdout of current job)
128+
* `_peek()` (function to get stdout of the current job)
129129
* `_submit_command` (command to submit a job to the scheduler)
130130
* `_job_regex` (regex to get the job ID from return string of submit command)
131131
* `_kill_command` (scheduler command to kill a job)

scripts/hpcnotebook

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,37 @@ import shutil
1111
import click
1212
import pkg_resources
1313

14+
import logging
15+
16+
logging.basicConfig(level=logging.INFO)
17+
logger = logging.getLogger('hpcnotebook')
18+
19+
# try to figure out which scheduler we have
20+
def which(program):
21+
import os
22+
def is_exe(fpath):
23+
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
24+
25+
fpath, fname = os.path.split(program)
26+
if fpath:
27+
if is_exe(program):
28+
return program
29+
else:
30+
for path in os.environ["PATH"].split(os.pathsep):
31+
path = path.strip('"')
32+
exe_file = os.path.join(path, program)
33+
if is_exe(exe_file):
34+
return exe_file
35+
36+
return None
37+
38+
if which('bjobs') is not None:
39+
SCHEDULER = 'LSF'
40+
HOST_ENVIRON = 'LSB_HOSTS'
41+
elif which('squeue') is not None:
42+
SCHEDULER = 'SLURM'
43+
HOST_ENVIRON = 'SLURM_NODELIST'
44+
1445
#
1546
# Initial inspiration for this script from https://github.com/felixcheung/vagrant-projects
1647
#
@@ -43,15 +74,6 @@ home = expanduser("~")
4374
# setup the path to the jupyter notebook configuration
4475
jupyter_config_path = "{home}/.jupyter_notebook".format(home=home)
4576

46-
def which(name):
47-
"""helper function to find whether a command exists"""
48-
found = 0
49-
for path in os.getenv("PATH").split(os.path.pathsep):
50-
full_path = path + os.sep + name
51-
if os.path.exists(full_path):
52-
found = full_path
53-
return found
54-
5577
@click.group()
5678
@click.option('--port', default=8889, help='Port for the notebook server')
5779
@click.pass_context
@@ -75,13 +97,16 @@ def setup(ctx, force):
7597
# get a password
7698
from notebook.auth import passwd
7799

78-
print(bc.WARNING + '[hpcnotebook] '+bc.ENDC+'This script will create a Jupyter notebook profile for working remotely')
79-
print(bc.WARNING + '[hpcnotebook] '+bc.ENDC+'When it is finished, you can find the configuration in %s\n'%(bc.UNDERLINE + jupyter_config_path + bc.ENDC))
80-
print(bc.WARNING + '[hpcnotebook] '+bc.ENDC+'First, we need a *new* password for your Jupyter notebook\n')
100+
logger.info("""
101+
This script will create a Jupyter notebook profile for working remotely.
102+
When it is finished, you can find the configuration in {conf_path}.
103+
First, please enter a *new* password for your Jupyter notebook:
104+
""".format(conf_path=bc.UNDERLINE + jupyter_config_path + bc.ENDC))
105+
81106
new_pass = passwd()
82107

83-
print(bc.WARNING + '[hpcnotebook] '+bc.ENDC+'Creating an SSL certificate to enable a secure connection; the certificate will be in your ~/.ssh directory\n')
84-
108+
logger.info("Creating an SSL certificate to enable a secure connection; the certificate will be in your ~/.ssh directory")
109+
85110
# make sure the .ssh directory is there before continuing
86111
sshdir = '{home}/.ssh'.format(home=home)
87112
if not os.path.exists(sshdir):
@@ -95,7 +120,7 @@ def setup(ctx, force):
95120

96121
lines = out.split('\n')
97122
for l in lines :
98-
print(bc.OKGREEN + '[openssl] ' + bc.ENDC + l)
123+
logger.info(bc.OKGREEN + '[openssl] ' + bc.ENDC + l)
99124

100125
# write the notebook config
101126

@@ -105,10 +130,10 @@ def setup(ctx, force):
105130
f.write(notebook_config_template.format(
106131
password=new_pass, certfile=certfile, port=port))
107132

108-
print(bc.WARNING + '[hpcnotebook] '+ bc.BOLD + 'Notebook setup complete' + bc.ENDC)
133+
logger.info(bc.BOLD + 'Notebook setup complete' + bc.ENDC)
109134

110135
else:
111-
print(bc.FAIL + "The jupyter notebook already looks set up; if you want to force setup, use --force".format(dir=jupyter_config_path) + bc.ENDC)
136+
logger.error(bc.FAIL + "The jupyter notebook already looks set up; if you want to force setup, use --force".format(dir=jupyter_config_path) + bc.ENDC)
112137

113138
@cli.command()
114139
@click.pass_context
@@ -127,11 +152,11 @@ def launch(ctx):
127152
conf_port = int(re.findall('port = (\d+)', conf.read())[0])
128153

129154
if conf_port != port:
130-
print(bc.WARNING + "Overriding the port found in the existing configuration" + bc.ENDC)
155+
logger.warning(bc.WARNING + "Overriding the port found in the existing configuration" + bc.ENDC)
131156
argv.append('--port={port}'.format(port=port))
132157

133158
# determine if we're running on a compute node
134-
if 'LSB_HOSTS' in os.environ:
159+
if HOST_ENVIRON in os.environ:
135160
compute = True
136161
else:
137162
compute = False
@@ -143,7 +168,7 @@ def launch(ctx):
143168
else:
144169
ip = 'localhost'
145170

146-
print(bc.BOLD + "To access the notebook, inspect the output below for the port number, then point your browser to https://{ip}:<port_number>".format(ip=ip) + bc.ENDC)
171+
logger.info(bc.BOLD + "To access the notebook, inspect the output below for the port number, then point your browser to https://{ip}:<port_number>".format(ip=ip) + bc.ENDC)
147172
sys.stdout.flush()
148173
launch_new_instance()
149174

@@ -156,10 +181,8 @@ def launch(ctx):
156181
@click.pass_context
157182
def submit(ctx, ncores, walltime, memory, template, jobname):
158183
"""Submit a notebook job to the scheduler"""
159-
if which('bsub'):
160-
scheduler='LSF'
161-
else:
162-
raise RuntimeWarning('bsub not found and no other schedulers are supported')
184+
if SCHEDULER != 'LSF':
185+
raise RuntimeError('only the LSF scheduler is supported at the moment')
163186

164187
if template is None:
165188
template_file = templates[scheduler]

scripts/sparkcluster

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from __future__ import print_function
99
import click
10+
import sparkhpc
1011
from sparkhpc import sparkjob
1112
import subprocess
1213
import os
@@ -17,30 +18,52 @@ logger = logging.getLogger('sparkhpc')
1718

1819
home = os.path.expanduser('~')
1920

21+
SCHEDULER = sparkjob.get_scheduler()
22+
2023
@click.group()
21-
@click.option('--scheduler', type=click.Choice(['lsf', 'slurm']), default='lsf', help='Which scheduler to use')
24+
@click.option('--scheduler', type=click.Choice(['lsf', 'slurm']), default=SCHEDULER, help='Which scheduler to use')
2225
@click.pass_context
2326
def cli(ctx, scheduler):
24-
ctx.obj['SJ'] = sparkjob._sparkjob_factory(scheduler)
27+
ctx.obj['SJ'] = sparkjob.sparkjob
2528

2629

2730
@cli.command()
28-
@click.argument('ncores')
31+
@click.argument('ncores', type=int)
2932
@click.option('--walltime', default="00:30", help="Walltime in HH:MM format")
3033
@click.option('--jobname', default='sparkcluster', help='Name to use for the job')
3134
@click.option('--template', default=None, help='Job template path')
32-
@click.option('--memory', default='2000', envvar='SPARK_WORKER_MEMORY',
33-
help='Memory for each worker in MB')
35+
@click.option('--memory-per-executor', default=2000, envvar='SPARK_EXECUTOR_MEMORY',
36+
help='Memory to reserve for each executor (i.e. the JVM) in MB')
37+
@click.option('--memory-per-core', default=2000,
38+
help='Memory per core to request from scheduler in MB')
39+
@click.option('--cores-per-executor', default=1,
40+
help='Cores per executor')
3441
@click.option('--spark-home', default=os.path.join(home,'spark'), envvar='SPARK_HOME',
3542
help='Location of the Spark distribution')
3643
@click.option('--wait', default=False, is_flag=True, help='Wait until the job starts')
3744
@click.pass_context
38-
def start(ctx, ncores, walltime, jobname, template, memory, spark_home, wait):
45+
def start(ctx,
46+
ncores,
47+
walltime,
48+
jobname,
49+
template,
50+
memory_per_executor,
51+
memory_per_core,
52+
cores_per_executor,
53+
spark_home,
54+
wait):
3955
"""Start the spark cluster as a batch job"""
4056

4157
SJ = ctx.obj['SJ']
4258

43-
sj = SJ(ncores=ncores, walltime=walltime, jobname=jobname, template=template, memory=memory, spark_home=spark_home)
59+
sj = SJ(ncores=ncores,
60+
walltime=walltime,
61+
jobname=jobname,
62+
template=template,
63+
memory_per_core=memory_per_core,
64+
memory_per_executor=memory_per_executor,
65+
cores_per_executor=cores_per_executor,
66+
spark_home=spark_home)
4467

4568
if wait:
4669
logger.info(' Waiting for job to start - ctrl-c to stop')
@@ -53,13 +76,8 @@ def start(ctx, ncores, walltime, jobname, template, memory, spark_home, wait):
5376
@click.pass_context
5477
def info(ctx):
5578
"""Get info about currently running clusters"""
56-
SJ = ctx.obj['SJ']
57-
sjs = SJ.current_clusters()
79+
sparkhpc.show_clusters()
5880

59-
if len(sjs)>0:
60-
sjs[0].show_clusters()
61-
else:
62-
logger.info(' No spark clusters running')
6381

6482
@cli.command()
6583
@click.argument('clusterid')
@@ -81,12 +99,13 @@ def stop(ctx, clusterid):
8199

82100

83101
@cli.command()
84-
@click.option('--memory', default='2000', help='Memory for each worker in MB')
102+
@click.option('--memory', default='2000M', help='Memory for each executor using a Java memory string')
85103
@click.option('--timeout', default=30, help='Timeout for starting spark master')
104+
@click.option('--cores-per-executor', default=1, help='Number of cores per executor')
86105
@click.pass_context
87-
def launch(ctx, memory, timeout):
106+
def launch(ctx, memory, timeout, cores_per_executor):
88107
"""Launch the Spark master and workers within a current job context"""
89-
sparkjob.start_cluster(memory,timeout)
108+
sparkjob.start_cluster(memory, timeout=timeout, cores_per_executor=cores_per_executor)
90109

91110
if __name__ == "__main__":
92111
cli(obj={})

sparkhpc/__init__.py

Lines changed: 7 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,14 @@
11
from __future__ import print_function
22
import os, sys
33
import logging
4+
import sparkjob
5+
import lsfsparkjob
6+
from lsfsparkjob import LSFSparkJob
7+
from slurmsparkjob import SLURMSparkJob
8+
49

510
logging.basicConfig(level=logging.INFO)
611
logger = logging.getLogger(__name__)
712

8-
from . import sparkjob
9-
10-
try :
11-
def start_spark(master = 'local[*]',
12-
spark_conf='./spark_conf',
13-
executor_memory=None,
14-
profiling=False,
15-
graphframes_package='graphframes:graphframes:0.3.0-spark2.0-s_2.11',
16-
extra_conf = None):
17-
"""Launch a SparkContext
18-
19-
Inputs
20-
------
21-
22-
master : URL to spark master in the form 'spark://<master>:<port>'
23-
24-
spark_conf : path to a spark configuration directory
25-
26-
executor_memory : executor memory in java memory string format, e.g. '4G'
27-
28-
profiling: whether to turn on python profiling or not
29-
30-
graphframes_package : which graphframes to load
31-
"""
32-
33-
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages {graphframes_package} pyspark-shell"\
34-
.format(graphframes_package=graphframes_package)
35-
36-
os.environ['SPARK_CONF_DIR'] = os.path.realpath(spark_conf)
37-
38-
os.environ['PYSPARK_PYTHON'] = sys.executable
39-
40-
from pyspark import SparkContext, SparkConf
41-
42-
conf = SparkConf()
43-
44-
conf.set('spark.driver.maxResultSize', '0')
45-
46-
if executor_memory is not None:
47-
conf.set('spark.executor.memory', executor_memory)
48-
if profiling:
49-
conf.set('spark.python.profile', 'true')
50-
else:
51-
conf.set('spark.python.profile', 'false')
52-
53-
if extra_conf is not None:
54-
for k,v in extra_conf.iteritems():
55-
conf.set(k,v)
56-
57-
sc = SparkContext(master=master, conf=conf)
58-
59-
return sc
60-
61-
def get_sqc(sc):
62-
from pyspark.sql import SQLContext
63-
64-
return SQLContext(sc)
65-
66-
except ImportError:
67-
logger.warning('Could not import pyspark -- make sure SPARK_HOME is set')
13+
def show_clusters():
14+
sparkjob.sparkjob().show_clusters()

sparkhpc/lsfsparkjob.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import os
2+
import time
3+
from .sparkjob import SparkJob
4+
import re
5+
import subprocess
6+
import logging
7+
8+
logging.basicConfig(level=logging.INFO)
9+
logger = logging.getLogger('sparkhpc.lsfsparkjob')
10+
11+
class LSFSparkJob(SparkJob):
12+
"""Class for submitting spark jobs with the LSF scheduler"""
13+
_submit_command = 'bsub < %s'
14+
_job_regex = 'Job <(\d+)>'
15+
_kill_command = 'bkill'
16+
_get_current_jobs = 'bjobs -o "job_name stat jobid"'
17+
18+
def _peek(self):
19+
return subprocess.check_output(["bpeek", str(self.jobid)])

0 commit comments

Comments
 (0)