Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions joshua/joshua.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_username():
def format_ensemble(e, props):
return " %-50s %s" % (
e,
" ".join("{}={}".format(k, v) for k, v in sorted(props.items())),
" ".join(f"{k}={v}" for k, v in sorted(props.items())),
)


Expand Down Expand Up @@ -75,7 +75,7 @@ def list_active_ensembles(
for props in joshua_model.show_in_progress(e):
print(
"\t{}".format(
" ".join("{}={}".format(k, v) for k, v in sorted(props.items()))
" ".join(f"{k}={v}" for k, v in sorted(props.items()))
)
)

Expand Down Expand Up @@ -116,11 +116,11 @@ def start_ensemble(
properties["timeout"] = timeout

if fail_fast > 0 and not no_fail_fast:
print("Note: Ensemble will complete after {} failed results.".format(fail_fast))
print(f"Note: Ensemble will complete after {fail_fast} failed results.")
properties["fail_fast"] = fail_fast

if max_runs > 0 and not no_max_runs:
print("Note: Ensemble will complete after {} runs.".format(max_runs))
print(f"Note: Ensemble will complete after {max_runs} runs.")
properties["max_runs"] = max_runs

if command:
Expand Down Expand Up @@ -304,7 +304,7 @@ def _delete_helper(to_delete, yes=False, dryrun=False, sanity=False):

if not yes and not dryrun:
response = input("Do you want to delete these ensembles [y/n]? ")
if response.strip().lower() not in set(["y", "yes"]):
if response.strip().lower() not in {"y", "yes"}:
print("Negative response received. Not performing deletion.")
return

Expand Down Expand Up @@ -381,10 +381,10 @@ def download_ensemble(ensemble, out=None, force=False, sanity=False, **args):

if out is None:
out_file = os.path.abspath(
os.path.join(os.getcwd(), "{}.tar.gz".format(ensemble))
os.path.join(os.getcwd(), f"{ensemble}.tar.gz")
)
elif os.path.isdir(out):
out_file = os.path.abspath(os.path.join(out, "{}.tar.gz".format(ensemble)))
out_file = os.path.abspath(os.path.join(out, f"{ensemble}.tar.gz"))
else:
out_file = os.path.abspath(out)
if not os.path.isdir(os.path.dirname(out_file)):
Expand All @@ -401,15 +401,15 @@ def download_ensemble(ensemble, out=None, force=False, sanity=False, **args):
return

if not force and os.path.isfile(out_file):
print("File {} already exists. Refusing to overwrite file.".format(out_file))
print(f"File {out_file} already exists. Refusing to overwrite file.")
return

# Treat slash characters as subdirectories
ensemble_dir = os.path.dirname(ensemble)
if ensemble_dir:
os.mkdir(ensemble_dir)

print("Downloading ensemble {} into {}...".format(ensemble, out_file))
print(f"Downloading ensemble {ensemble} into {out_file}...")
with open(out_file, "wb") as fout:
joshua_model.get_ensemble_data(ensemble_id=ensemble, outfile=fout)
print("Download completed")
Expand Down
20 changes: 10 additions & 10 deletions joshua/joshua_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __repr__(self):


# This is used to handle waiting for a given amount of time.
class TimeoutFuture(object):
class TimeoutFuture:
def __init__(self, timeout):
self.cb_list = []
self.timer = threading.Timer(timeout, self._do_on_ready)
Expand Down Expand Up @@ -263,7 +263,7 @@ def tar_artifacts(ensemble, seed, sources, dest, work_dir=None):
)
try:
# Create a temporary directory in the destination where we will store the results.
out_name = "joshua-run-{0}-{1}".format(ensemble, seed)
out_name = f"joshua-run-{ensemble}-{seed}"
tmpdir = os.path.join(work_dir, out_name)
os.makedirs(tmpdir)

Expand Down Expand Up @@ -413,7 +413,7 @@ def _cancelled(self):
def run(self, command, cwd, env):
cmd_path = os.path.join(cwd, command[0])
if not os.path.exists(cmd_path):
log("{} doesn't exist".format(cmd_path))
log(f"{cmd_path} doesn't exist")
return
process = subprocess.Popen(
command,
Expand Down Expand Up @@ -528,7 +528,7 @@ def _run_ensemble(
k8s_namespace_file = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
if env.get('HOSTNAME', False) and os.path.isfile(k8s_namespace_file):
pod_name = env['HOSTNAME']
with open(k8s_namespace_file, 'r') as f:
with open(k8s_namespace_file) as f:
namespace = f.read()

# Get the cluster config
Expand All @@ -554,7 +554,7 @@ def _run_ensemble(
# Set environment variable to use the created temporary directory as its temporary directory.
env["TMP"] = os.path.join(where, "tmp")

log("{} {} {}".format(ensemble, seed, command))
log(f"{ensemble} {seed} {command}")

# Run the test and log output
process = subprocess.Popen(
Expand All @@ -577,7 +577,7 @@ def _run_ensemble(
try:
output, _ = process.communicate(timeout=1)
retcode = process.poll()
log("exit code: {}".format(retcode))
log(f"exit code: {retcode}")
# output = output.decode('utf-8')

break
Expand Down Expand Up @@ -642,7 +642,7 @@ def _run_ensemble(
try:
i = 0
while os.path.exists(to_write):
to_write = os.path.join(where, "tmp", "console-{0}.log".format(i))
to_write = os.path.join(where, "tmp", f"console-{i}.log")
i += 1

with open(to_write, "wb") as fout:
Expand Down Expand Up @@ -808,7 +808,7 @@ def agent(
while True:
# Break if the stop file is defined and present
if stop_file and os.path.exists(stop_file):
log("Exiting due to existing stopfile: {}".format(stop_file))
log(f"Exiting due to existing stopfile: {stop_file}")
break
# Break if requested
if stopAgent():
Expand Down Expand Up @@ -885,7 +885,7 @@ def agent(
# Throw away local state for ensembles that are no longer active
local_ensemble_dirs = set(os.listdir(ensemble_dir(basepath=work_dir)))
for e in (local_ensemble_dirs - set(ensembles)) - set(sanity_ensembles):
log("removing {} {}".format(e, ensemble_dir(e, basepath=work_dir)))
log(f"removing {e} {ensemble_dir(e, basepath=work_dir)}")
shutil.rmtree(
ensemble_dir(e, basepath=work_dir), True
) # SOMEDAY: this sometimes throws errors, but we don't know why and it isn't that important
Expand All @@ -906,7 +906,7 @@ def agent(
try:
watch.wait_for_any(watch, sanity_watch, TimeoutFuture(1.0))
except Exception as e:
log("watch error: {}".format(e))
log(f"watch error: {e}")
watch = None
time.sleep(1.0)

Expand Down
22 changes: 11 additions & 11 deletions joshua/joshua_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def is_message(text):

def unwrap_message(text):
root = ET.fromstring(text)
return root.getchildren()[0].attrib
return next(iter(root)).attrib


def load_datetime(string):
Expand Down Expand Up @@ -224,7 +224,7 @@ def identify_existing_ensembles(tr, ensembles):


@transactional
def list_and_watch_active_ensembles(tr) -> Tuple[List[str], fdb.Future]:
def list_and_watch_active_ensembles(tr) -> tuple[list[str], fdb.Future]:
return _list_and_watch_ensembles(tr, dir_active, dir_active_changes)


Expand Down Expand Up @@ -258,7 +258,7 @@ def _unpack_property(ensemble, key, value, into):
into[t[1]] = struct.unpack("<Q", value)[0]


def _list_ensembles(tr, dir) -> List[Tuple[str, Dict]]:
def _list_ensembles(tr, dir) -> list[tuple[str, dict]]:
prop_reads = []
for k, v in tr[dir.range()]:
(ensemble,) = dir.unpack(k)
Expand All @@ -281,17 +281,17 @@ def _list_ensembles(tr, dir) -> List[Tuple[str, Dict]]:


@transactional
def list_active_ensembles(tr) -> List[Tuple[str, Dict]]:
def list_active_ensembles(tr) -> list[tuple[str, dict]]:
return _list_ensembles(tr, dir_active)


@transactional
def list_sanity_ensembles(tr) -> List[Tuple[str, Dict]]:
def list_sanity_ensembles(tr) -> list[tuple[str, dict]]:
return _list_ensembles(tr, dir_sanity)


def list_all_ensembles() -> List[Tuple[str, Dict]]:
ensembles: List[Tuple[str, Dict]] = []
def list_all_ensembles() -> list[tuple[str, dict]]:
ensembles: list[tuple[str, dict]] = []
r = dir_all_ensembles.range()
start = r.start
tr = db.create_transaction()
Expand Down Expand Up @@ -405,7 +405,7 @@ def _create_ensemble(tr, ensemble_id, properties, sanity=False):
dir, changes = get_dir_changes(sanity)

if tr[dir_all_ensembles[ensemble_id]] != None:
print("{} already inserted".format(ensemble_id))
print(f"{ensemble_id} already inserted")
return # Already inserted
tr[dir_all_ensembles[ensemble_id]] = b""
for k, v in properties.items():
Expand Down Expand Up @@ -458,7 +458,7 @@ def stop_user_ensembles(username, sanity=False):

def get_active_ensembles(
stopped, sanity=False, username=None
) -> List[Tuple[str, Dict]]:
) -> list[tuple[str, dict]]:
if stopped:
ensemble_list = list_all_ensembles()
elif sanity:
Expand Down Expand Up @@ -655,7 +655,7 @@ def _get_snap_counter(tr: fdb.Transaction, ensemble_id: str, counter: str) -> in

def _get_seeds_and_heartbeats(
ensemble_id: str, tr: fdb.Transaction
) -> List[Tuple[int, float]]:
) -> list[tuple[int, float]]:
result = []
for k, v in tr.snapshot[dir_ensemble_incomplete[ensemble_id]["heartbeat"].range()]:
(seed,) = dir_ensemble_incomplete[ensemble_id]["heartbeat"].unpack(k)
Expand Down Expand Up @@ -718,7 +718,7 @@ def should_run_ensemble(tr: fdb.Transaction, ensemble_id: str) -> bool:


@transactional
def show_in_progress(tr: fdb.Transaction, ensemble_id: str) -> List[Tuple[int, Dict]]:
def show_in_progress(tr: fdb.Transaction, ensemble_id: str) -> list[tuple[int, dict]]:
"""
Returns a list of properties for in progress tests
"""
Expand Down
12 changes: 6 additions & 6 deletions joshua/process_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def get_environment(pid):
var_strs,
)
)
except IOError:
except OSError:
# This is not our process, so we can't open the file.
return dict()

Expand Down Expand Up @@ -238,7 +238,7 @@ def test_check_alive(self):

def test_mark_env(self):
env = mark_environment(dict())
self.assertEquals(os.getpid(), int(env[VAR_NAME]))
self.assertEqual(os.getpid(), int(env[VAR_NAME]))

def test_get_all_pids(self):
if sys.platform != "linux2":
Expand Down Expand Up @@ -268,9 +268,9 @@ def test_get_environment(self):
# Make sure the environment for this process is the same
# as we know it to be.
env = get_environment(str(os.getpid()))
self.assertEquals(env, os.environ)
self.assertEqual(env, os.environ)
env = get_environment(os.getpid())
self.assertEquals(env, os.environ)
self.assertEqual(env, os.environ)

def test_retrieve_children(self):
if sys.platform != "linux2":
Expand All @@ -285,7 +285,7 @@ def test_retrieve_children(self):
stderr=subprocess.PIPE,
)
pids = retrieve_children()
self.assertEquals(len(pids), 10)
self.assertEqual(len(pids), 10)

def test_kill_all_children(self):
if sys.platform != "linux2":
Expand All @@ -300,7 +300,7 @@ def test_kill_all_children(self):
stderr=subprocess.PIPE,
)
self.assertTrue(kill_all_children())
self.assertEquals(len(retrieve_children()), 0)
self.assertEqual(len(retrieve_children()), 0)

def test_wait_for_death(self):
process = subprocess.Popen(
Expand Down
6 changes: 3 additions & 3 deletions joshua_done_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import List

@fdb.transactional
def write_ensemble_data(tr, path: List[str]):
def write_ensemble_data(tr, path: list[str]):
root_dir = fdb.directory.create_or_open(tr, tuple(path))
num = os.getenv('JOSHUA_SEED', None)
has_joshua_seed = num is not None
Expand All @@ -28,8 +28,8 @@ def write_ensemble_data(tr, path: List[str]):
parser.add_argument('ensemble_dir')
parser.add_argument('cluster_file')
args = parser.parse_args()
dirPath: List[str] = args.ensemble_dir.split(',')
dirPath: list[str] = args.ensemble_dir.split(',')
print('dirPath = ({})'.format(",".join(dirPath)))
print('clusterFile = {}'.format(args.cluster_file))
print(f'clusterFile = {args.cluster_file}')
db = fdb.open(args.cluster_file if args.cluster_file != 'None' else None)
write_ensemble_data(db, dirPath)
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[bdist_wheel]
python-tag = py38
python-tag = py39
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
+ module.platforms
+ [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: Implementation :: CPython",
],
)
6 changes: 3 additions & 3 deletions test_joshua_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def with_script(tmp_path, script_contents):
return factory

def add_bash_script(self, name, contents):
script = "#!/bin/bash\n{}".format(contents).encode("utf-8")
script = f"#!/bin/bash\n{contents}".encode("utf-8")
entry = tarfile.TarInfo(name)
entry.mode = 0o755
entry.size = len(script)
Expand Down Expand Up @@ -119,9 +119,9 @@ def fdb_cluster():
port = getFreePort()
cluster_file = os.path.join(tmp_dir, "fdb.cluster")
with open(cluster_file, "w") as f:
f.write("abdcefg:[email protected]:{}".format(port))
f.write(f"abdcefg:[email protected]:{port}")
proc = subprocess.Popen(
["fdbserver", "-p", "auto:{}".format(port), "-C", cluster_file], cwd=tmp_dir
["fdbserver", "-p", f"auto:{port}", "-C", cluster_file], cwd=tmp_dir
)

subprocess.check_output(
Expand Down
2 changes: 1 addition & 1 deletion webapp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
load_dotenv(os.path.join(basedir, '.env'))


class Config(object):
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'db.sqlite')
Expand Down
10 changes: 5 additions & 5 deletions webapp/joshua_webapp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ def upload_ensemble():
try:
properties = schema.load(request_data)
except Exception as err:
app.logger.info('api_upload: Validation error: {}'.format(err))
app.logger.info(f'api_upload: Validation error: {err}')
return {
"message": 'api_upload: Post field validation error: {}'.format(err)
"message": f'api_upload: Post field validation error: {err}'
}, 422
fileobj = request_files['file']
filename = secure_filename(fileobj.filename)
Expand All @@ -102,7 +102,7 @@ def upload_ensemble():
if not tarfile.is_tarfile(saved_file):
os.remove(saved_file)
app.logger.info(
'api_upload: not a valid tar file: {}'.format(saved_file))
f'api_upload: not a valid tar file: {saved_file}')
return {"error": 'api_upload: not a valid tar file'}, 400

# convert to non-unicode string for username
Expand All @@ -128,7 +128,7 @@ def stop_ensemble(ensemble):
properties = joshua_model.get_ensemble_properties(ensemble)
sanity = properties[
'sanity'] if properties and 'sanity' in properties else True
app.logger.info('Stop ensemble {} {}'.format(ensemble, sanity))
app.logger.info(f'Stop ensemble {ensemble} {sanity}')
joshua_model.stop_ensemble(ensemble, sanity=sanity)
return jsonify('OK'), 200

Expand All @@ -139,6 +139,6 @@ def resume_ensemble(ensemble):
properties = joshua_model.get_ensemble_properties(ensemble)
sanity = properties[
'sanity'] if properties and 'sanity' in properties else True
app.logger.info('Resume ensemble {} {}'.format(ensemble, sanity))
app.logger.info(f'Resume ensemble {ensemble} {sanity}')
joshua_model.resume_ensemble(ensemble, sanity=sanity)
return jsonify('OK'), 200
Loading