Skip to content

Add an argument to ipcluster plugin to specify the number of engines #547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions starcluster/plugins/ipcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _start_engines(node, user, n_engines=None, kill_existing=False):
n_engines = node.num_processors
node.ssh.switch_user(user)
if kill_existing:
node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True)
node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True)
node.ssh.execute("ipcluster engines --n=%i --daemonize" % n_engines)
node.ssh.switch_user('root')

Expand All @@ -91,7 +91,7 @@ class IPCluster(DefaultClusterSetup):

"""
def __init__(self, enable_notebook=False, notebook_passwd=None,
notebook_directory=None, packer=None, log_level='INFO'):
notebook_directory=None, packer=None, master_engines=None, node_engines=None, log_level='INFO'):
super(IPCluster, self).__init__()
if isinstance(enable_notebook, basestring):
self.enable_notebook = enable_notebook.lower().strip() == 'true'
Expand All @@ -100,6 +100,8 @@ def __init__(self, enable_notebook=False, notebook_passwd=None,
self.notebook_passwd = notebook_passwd or utils.generate_passwd(16)
self.notebook_directory = notebook_directory
self.log_level = log_level
self.master_engines = master_engines
self.node_engines = node_engines
if packer not in (None, 'json', 'pickle', 'msgpack'):
log.error("Unsupported packer: %s", packer)
self.packer = None
Expand Down Expand Up @@ -163,7 +165,11 @@ def _write_config(self, master, user, profile_dir):
f.close()

def _start_cluster(self, master, profile_dir):
n_engines = max(1, master.num_processors - 1)
if self.master_engines is None:
n_engines = max(1, master.num_processors - 1)
else:
n_engines = int(self.master_engines)
print "Setting master engines to '%s'" % self.master_engines
log.info("Starting the IPython controller and %i engines on master"
% n_engines)
# cleanup existing connection files, to prevent their use
Expand Down Expand Up @@ -215,7 +221,7 @@ def _start_cluster(self, master, profile_dir):
self._authorize_port(master, (1000, 65535), "IPython controller")
return local_json, n_engines

def _start_notebook(self, master, user, profile_dir):
def _start_notebook(self, master, user, profile_dir, time_to_dead=30.0):
log.info("Setting up IPython web notebook for user: %s" % user)
user_cert = posixpath.join(profile_dir, '%s.pem' % user)
ssl_cert = posixpath.join(profile_dir, '%s.pem' % user)
Expand All @@ -242,6 +248,7 @@ def _start_notebook(self, master, user, profile_dir):
"c.NotebookApp.open_browser = False",
"c.NotebookApp.password = u'%s'" % sha1pass,
"c.NotebookApp.port = %d" % notebook_port,
"c.NotebookApp.time_to_dead = %d" % time_to_dead,
]))
f.close()
if self.notebook_directory is not None:
Expand Down Expand Up @@ -288,12 +295,16 @@ def run(self, nodes, master, user, user_shell, volumes):
cfile, n_engines_master = self._start_cluster(master, profile_dir)
# Start engines on each of the non-master nodes
non_master_nodes = [node for node in nodes if not node.is_master()]
n_engines_non_master = 0
for node in non_master_nodes:
if self.node_engines is None:
n_engines = node.num_processors
else:
n_engines = int(self.node_engines)
self.pool.simple_job(
_start_engines, (node, user, node.num_processors),
_start_engines, (node, user, n_engines),
jobid=node.alias)
n_engines_non_master = sum(node.num_processors
for node in non_master_nodes)
n_engines_non_master += n_engines
if len(non_master_nodes) > 0:
log.info("Adding %d engines on %d nodes",
n_engines_non_master, len(non_master_nodes))
Expand All @@ -310,9 +321,12 @@ def run(self, nodes, master, user, user_shell, volumes):

def on_add_node(self, node, nodes, master, user, user_shell, volumes):
self._check_ipython_installed(node)
n_engines = node.num_processors
if self.node_engines is None:
n_engines = node.num_processors
else:
n_engines = int(self.node_engines)
log.info("Adding %d engines on %s", n_engines, node.alias)
_start_engines(node, user)
_start_engines(node, user, n_engines)

def on_remove_node(self, node, nodes, master, user, user_shell, volumes):
raise NotImplementedError("on_remove_node method not implemented")
Expand All @@ -332,7 +346,7 @@ def run(self, nodes, master, user, user_shell, volumes):
master.ssh.execute("ipcluster stop", ignore_exit_status=True)
time.sleep(2)
log.info("Stopping IPython controller on %s", master.alias)
master.ssh.execute("pkill -f ipcontrollerapp",
master.ssh.execute("pkill -f IPython.parallel.controller",
ignore_exit_status=True)
master.ssh.execute("pkill -f 'ipython notebook'",
ignore_exit_status=True)
Expand All @@ -344,7 +358,7 @@ def run(self, nodes, master, user, user_shell, volumes):

def _stop_engines(self, node, user):
node.ssh.switch_user(user)
node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True)
node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True)
node.ssh.switch_user('root')

def on_add_node(self, node, nodes, master, user, user_shell, volumes):
Expand All @@ -354,7 +368,7 @@ def on_remove_node(self, node, nodes, master, user, user_shell, volumes):
raise NotImplementedError("on_remove_node method not implemented")


class IPClusterRestartEngines(DefaultClusterSetup):
class IPClusterRestartEngines(IPCluster):
"""Plugin to kill and restart all engines of an IPython cluster

This plugin can be useful to hard-reset the all the engines, for instance
Expand All @@ -364,14 +378,19 @@ class IPClusterRestartEngines(DefaultClusterSetup):
This plugin is meant to be run manually with:

starcluster runplugin plugin_conf_name cluster_name

"""
def run(self, nodes, master, user, user_shell, volumes):
n_total = 0
for node in nodes:
n_engines = node.num_processors
if node.is_master() and n_engines > 2:
n_engines -= 1
if node.is_master() and (self.master_engines is not None):
n_engines = int(self.master_engines)
elif self.node_engines is not None:
n_engines = int(self.node_engines)
elif node.is_master():
# and n_engines > 2: # XXX I'm not sure I understand this logic yet.
n_engines = node.num_processors - 1
else:
n_engines = node.num_processors
self.pool.simple_job(
_start_engines, (node, user, n_engines, True),
jobid=node.alias)
Expand Down