From 238af28ff55f8f63aa9a7fa23224f23257c6d1ca Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Mon, 14 Dec 2020 15:09:24 -0500 Subject: [PATCH 01/11] adapt to work with PL 1.x --- src/exabiome/nn/loader.py | 48 +++++++++++++++++++++++++++++++++++ src/exabiome/nn/models/lit.py | 6 ++--- src/exabiome/nn/train.py | 34 +++++++++++++------------ src/exabiome/run/job.py | 4 +-- src/exabiome/run/run_job.py | 6 +++-- 5 files changed, 74 insertions(+), 24 deletions(-) diff --git a/src/exabiome/nn/loader.py b/src/exabiome/nn/loader.py index 477e9e2..4acf9e8 100644 --- a/src/exabiome/nn/loader.py +++ b/src/exabiome/nn/loader.py @@ -1,5 +1,8 @@ +import os + import torch.nn.functional as F import torch +import pytorch_lightning as pl import numpy as np from torch.utils.data import DataLoader, Dataset, SubsetRandomSampler from sklearn.model_selection import train_test_split @@ -19,6 +22,9 @@ def check_window(window, step): def read_dataset(path): + for root, dirs, files in os.walk("/mnt/bb/ajtritt/"): + for filename in files: + print(rank, '-', filename) hdmfio = get_hdf5io(path, 'r') difile = hdmfio.read() dataset = SeqDataset(difile) @@ -315,3 +321,45 @@ def get_loader(dataset, distances=False, **kwargs): collater = DistanceCollater(dataset.difile.distances.data[:]) return DataLoader(dataset, collate_fn=collater, **kwargs) + +class DIDataModule(pl.LightningDataModule): + + def __init__(self, hparams, inference=False): + self.hparams = hparams + self.inference = inference + + def train_dataloader(self): + self._check_loaders() + return self.loaders['train'] + + def val_dataloader(self): + self._check_loaders() + return self.loaders['validate'] + + def test_dataloader(self): + self._check_loaders() + return self.loaders['test'] + + + + def _check_loaders(self): + """ + Load dataset if it has not been loaded yet + """ + dataset, io = process_dataset(self.hparams, inference=self._inference) + if self.hparams.load: + dataset.load() + + kwargs = dict(random_state=self.hparams.seed, + batch_size=self.hparams.batch_size, + distances=self.hparams.manifold, + downsample=self.hparams.downsample) + kwargs.update(self.hparams.loader_kwargs) + if self._inference: + kwargs['distances'] = False + kwargs.pop('num_workers', None) + kwargs.pop('multiprocessing_context', None) + tr, te, va = train_test_loaders(dataset, **kwargs) + self.loaders = {'train': tr, 'test': te, 'validate': va} + self.dataset = dataset + diff --git a/src/exabiome/nn/models/lit.py b/src/exabiome/nn/models/lit.py index baaceff..db5d608 100644 --- a/src/exabiome/nn/models/lit.py +++ b/src/exabiome/nn/models/lit.py @@ -93,9 +93,6 @@ def training_step(self, batch, batch_idx): loss = self._loss(output, target) return {'loss': loss} - def training_epoch_end(self, outputs): - return {'log': outputs[0]} - # VALIDATION def val_dataloader(self): self._check_loaders() @@ -108,7 +105,8 @@ def validation_step(self, batch, batch_idx): def validation_epoch_end(self, outputs): val_loss_mean = torch.stack([x['val_loss'] for x in outputs]).mean() - return {'log': {'val_loss': val_loss_mean}} + #return {'log': {'val_loss': val_loss_mean}} + self.log('val_loss', val_loss_mean) # TEST def test_dataloader(self): diff --git a/src/exabiome/nn/train.py b/src/exabiome/nn/train.py index 2378d17..719af14 100644 --- a/src/exabiome/nn/train.py +++ b/src/exabiome/nn/train.py @@ -8,6 +8,12 @@ from ..utils import parse_seed, check_argv, parse_logger, check_directory from .utils import process_gpus, process_model, process_output from hdmf.utils import docval +from pytorch_lightning import Trainer, seed_everything +from pytorch_lightning.loggers import TensorBoardLogger +from pytorch_lightning.callbacks import ModelCheckpoint +import pytorch_lightning.cluster_environments as cenv + + import argparse import logging @@ -121,7 +127,6 @@ def process_args(args=None, return_io=False): args.loader_kwargs = dict() if args.lsf: args.loader_kwargs['num_workers'] = 6 - args.loader_kwargs['multiprocessing_context'] = 'spawn' model = process_model(args) @@ -182,19 +187,13 @@ def process_args(args=None, return_io=False): return tuple(ret) -from pytorch_lightning import Trainer, seed_everything -from pytorch_lightning.loggers import TensorBoardLogger -from pytorch_lightning.callbacks import ModelCheckpoint - - def run_lightning(argv=None): '''Run training with PyTorch Lightning''' - print(argv) model, args, addl_targs = process_args(parse_args(argv=argv)) outbase, output = process_output(args) check_directory(outbase) - print(args) + print("processed_args: ", args, file=sys.stderr) # save arguments with open(output('args.pkl'), 'wb') as f: @@ -206,21 +205,21 @@ def run_lightning(argv=None): # dependent on the dataset, such as final number of outputs targs = dict( - checkpoint_callback=ModelCheckpoint(filepath=output("seed=%d-{epoch:02d}-{val_loss:.2f}" % args.seed), save_weights_only=False, save_last=True, save_top_k=1), + checkpoint_callback=ModelCheckpoint(filepath=output("seed=%d-{epoch:02d}-{val_loss:.2f}" % args.seed), save_weights_only=False, save_last=True, save_top_k=None), #logger = TensorBoardLogger(save_dir=os.path.join(args.output, 'tb_logs'), name=args.experiment), logger = TensorBoardLogger(save_dir=os.path.join(args.output, 'tb_logs')), - row_log_interval=10, - log_save_interval=100 + log_every_n_steps=100 ) targs.update(addl_targs) + print('Trainer args:', targs, file=sys.stderr) trainer = Trainer(**targs) if args.debug: - print_dataloader(model.test_dataloader()) - print_dataloader(model.train_dataloader()) - print_dataloader(model.val_dataloader()) + print_dataloaders(test=model.test_dataloader(), + train=model.train_dataloader(), + val=model.val_dataloader()) s = datetime.now() trainer.fit(model) @@ -272,8 +271,11 @@ def cuda_sum(argv=None): print('torch.cuda.is_available:', torch.cuda.is_available()) print('torch.cuda.device_count:', torch.cuda.device_count()) -def print_dataloader(dl): - print(dl.dataset.index[0], dl.dataset.index[-1]) +def print_dataloaders(**loaders): + msg = list() + for k, v in loaders.items(): + msg.append("%s=(%s, %s)" % (k, v.dataset.index[0], v.dataset.index[-1])) + print(" ".join(msg), file=sys.stderr) def overall_metric(model, loader, metric): val = 0.0 diff --git a/src/exabiome/run/job.py b/src/exabiome/run/job.py index 14516e0..8afe036 100644 --- a/src/exabiome/run/job.py +++ b/src/exabiome/run/job.py @@ -161,9 +161,9 @@ def write(self, f, options=None): print(file=f) for k, v in self.env_vars.items(): if isinstance(v, str): - print(f'{k}="{v}"', file=f) + print(f'export {k}="{v}"', file=f) else: - print(f'{k}={v}', file=f) + print(f'export {k}={v}', file=f) print(file=f) for c in self.commands: #if isinstance(c, dict): diff --git a/src/exabiome/run/run_job.py b/src/exabiome/run/run_job.py index 6d27c69..3432fcb 100644 --- a/src/exabiome/run/run_job.py +++ b/src/exabiome/run/run_job.py @@ -146,7 +146,8 @@ def run_train(argv=None): if args.nodes > 1: job.set_env_var('OMP_NUM_THREADS', 1) - job.set_env_var('NCCL_DEBUG', 'INFO') + job.set_env_var('NCCL_SOCKET_IFNAME', 'ib0') + job.set_env_var('NCCL_DEBUG', 'WARN') job.set_env_var('OPTIONS', options) job.set_env_var('OUTDIR', f'{expdir}/train.$JOB') @@ -168,7 +169,7 @@ def run_train(argv=None): if args.summit and job.use_bb: job.add_command('echo "$INPUT to $BB_INPUT"') - job.add_command('cp $INPUT $BB_INPUT', run='jsrun -n 1') + job.add_command(f'cp $INPUT $BB_INPUT', run='jsrun -n {args.nodes}') job.add_command('ls /mnt/bb/$USER', run='jsrun -n 1') job.add_command('ls $BB_INPUT', run='jsrun -n 1') @@ -186,6 +187,7 @@ def run_train(argv=None): # when using regular DDP, jsrun should be called with one resource per node (-r) and # one rank per GPU (-a) to work with PyTorch Lightning jsrun = f'jsrun -g {args.gpus} -n {args.nodes} -a {args.gpus} -r 1 -c 42' + #jsrun = f'jsrun -g {args.gpus} -n {args.nodes} -a 1 -r 1 -c 42' job.add_command('$CMD > $LOG 2>&1', run=jsrun) else: job.add_command('$CMD > $LOG 2>&1', run='srun') From 27bb2e50e761cc2fd7f4270dd30bacb6c9e2bbfc Mon Sep 17 00:00:00 2001 From: Ryan Ly Date: Wed, 10 Feb 2021 18:19:06 -0800 Subject: [PATCH 02/11] Expand $CSCRATCH env variable on cori --- src/exabiome/run/run_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exabiome/run/run_job.py b/src/exabiome/run/run_job.py index 933a1c0..68f8d3f 100644 --- a/src/exabiome/run/run_job.py +++ b/src/exabiome/run/run_job.py @@ -27,7 +27,7 @@ def check_cori(args): if args.nodes is None: args.nodes = 1 if args.outdir is None: - args.outdir = os.path.abspath("$CSCRATCH/exabiome/deep-index") + args.outdir = os.path.abspath(os.path.expandvars("$CSCRATCH/exabiome/deep-index")) def run_train(argv=None): From 785a61defa3651c4d2d0815d9bc575a3bbdcd06f Mon Sep 17 00:00:00 2001 From: Ryan Ly Date: Wed, 10 Feb 2021 22:19:13 -0800 Subject: [PATCH 03/11] Add architecture attribute to SlurmJob --- src/exabiome/run/cori.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/exabiome/run/cori.py b/src/exabiome/run/cori.py index bb525b1..b80afef 100644 --- a/src/exabiome/run/cori.py +++ b/src/exabiome/run/cori.py @@ -14,6 +14,7 @@ class SlurmJob(AbstractJob): job_var = 'SLURM_JOB_ID' job_fmt_var = 'j' job_id_re = 'Submitted batch job (\d+)' + architecture = 'haswell' debug_queue = 'debug' From aa6541a2dd392263e01c3b601ddd2740fb7540d5 Mon Sep 17 00:00:00 2001 From: Ryan Ly Date: Wed, 10 Feb 2021 22:25:34 -0800 Subject: [PATCH 04/11] Update set architecture on SlurmJob --- src/exabiome/run/cori.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/exabiome/run/cori.py b/src/exabiome/run/cori.py index b80afef..15759dd 100644 --- a/src/exabiome/run/cori.py +++ b/src/exabiome/run/cori.py @@ -14,11 +14,10 @@ class SlurmJob(AbstractJob): job_var = 'SLURM_JOB_ID' job_fmt_var = 'j' job_id_re = 'Submitted batch job (\d+)' - architecture = 'haswell' debug_queue = 'debug' - def __init__(self, queue='batch', project='m2865', time='1:00:00', nodes=1, jobname=None, output=None, error=None): + def __init__(self, queue='batch', project='m2865', time='1:00:00', nodes=1, jobname=None, output=None, error=None, architecture='gpu'): super().__init__() self.queue = queue self.project = project @@ -29,7 +28,7 @@ def __init__(self, queue='batch', project='m2865', time='1:00:00', nodes=1, jobn self.output = f'{self.jobname}.%J' self.error = f'{self.jobname}.%J' - self.add_addl_jobflag('C', 'gpu') + self.add_addl_jobflag('C', architecture) def write_run(self, f, command, command_options, options): print(f'srun -u {command}', file=f) From d068cbf8d549c67d1c65eb32eab0a9703178869b Mon Sep 17 00:00:00 2001 From: Ryan Ly Date: Wed, 10 Feb 2021 22:29:19 -0800 Subject: [PATCH 05/11] Update run_job.py --- src/exabiome/run/run_job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/exabiome/run/run_job.py b/src/exabiome/run/run_job.py index 68f8d3f..771d413 100644 --- a/src/exabiome/run/run_job.py +++ b/src/exabiome/run/run_job.py @@ -48,6 +48,7 @@ def run_train(argv=None): rsc_grp.add_argument('-N', '--jobname', help="the name of the job", default=None) rsc_grp.add_argument('-q', '--queue', help="the queue to submit to", default=None) rsc_grp.add_argument('-P', '--project', help="the project/account to submit under", default=None) + rsc_grp.add_argument('-a', '--arch', help="the architecture to use, e.g., gpu or haswell (cori only)", default='gpu') system_grp = parser.add_argument_group('Compute system') grp = system_grp.add_mutually_exclusive_group() @@ -88,7 +89,7 @@ def run_train(argv=None): job.set_use_bb(True) else: check_cori(args) - job = SlurmJob() + job = SlurmJob(args.arch) job.nodes = args.nodes job.time = args.time From d322f6e81e7bad62359504029c8330d2980b6bd2 Mon Sep 17 00:00:00 2001 From: Ryan Ly Date: Wed, 10 Feb 2021 22:30:02 -0800 Subject: [PATCH 06/11] Rename variable --- src/exabiome/run/cori.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/exabiome/run/cori.py b/src/exabiome/run/cori.py index 15759dd..ca2835d 100644 --- a/src/exabiome/run/cori.py +++ b/src/exabiome/run/cori.py @@ -17,7 +17,7 @@ class SlurmJob(AbstractJob): debug_queue = 'debug' - def __init__(self, queue='batch', project='m2865', time='1:00:00', nodes=1, jobname=None, output=None, error=None, architecture='gpu'): + def __init__(self, queue='batch', project='m2865', time='1:00:00', nodes=1, jobname=None, output=None, error=None, arch='gpu'): super().__init__() self.queue = queue self.project = project @@ -28,7 +28,7 @@ def __init__(self, queue='batch', project='m2865', time='1:00:00', nodes=1, jobn self.output = f'{self.jobname}.%J' self.error = f'{self.jobname}.%J' - self.add_addl_jobflag('C', architecture) + self.add_addl_jobflag('C', arch) def write_run(self, f, command, command_options, options): print(f'srun -u {command}', file=f) From 2d51ba0ea30af3963f1daac92c8fb4da4dd09a66 Mon Sep 17 00:00:00 2001 From: Ryan Ly Date: Wed, 10 Feb 2021 22:32:11 -0800 Subject: [PATCH 07/11] Update run_job.py --- src/exabiome/run/run_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exabiome/run/run_job.py b/src/exabiome/run/run_job.py index 771d413..3ff86cd 100644 --- a/src/exabiome/run/run_job.py +++ b/src/exabiome/run/run_job.py @@ -89,7 +89,7 @@ def run_train(argv=None): job.set_use_bb(True) else: check_cori(args) - job = SlurmJob(args.arch) + job = SlurmJob(arch=args.arch) job.nodes = args.nodes job.time = args.time From 6006f286d9e340559edd513b64f39c86635d9fba Mon Sep 17 00:00:00 2001 From: Ryan Ly Date: Wed, 10 Feb 2021 22:51:42 -0800 Subject: [PATCH 08/11] Set conda env for cori jobs --- src/exabiome/run/run_job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/exabiome/run/run_job.py b/src/exabiome/run/run_job.py index 3ff86cd..8c48fc4 100644 --- a/src/exabiome/run/run_job.py +++ b/src/exabiome/run/run_job.py @@ -83,7 +83,6 @@ def run_train(argv=None): if args.summit: check_summit(args) job = LSFJob() - job.set_conda_env(args.conda_env) job.add_modules('open-ce') if not args.load: job.set_use_bb(True) @@ -91,6 +90,8 @@ def run_train(argv=None): check_cori(args) job = SlurmJob(arch=args.arch) + job.set_conda_env(args.conda_env) + job.nodes = args.nodes job.time = args.time job.gpus = args.gpus From 45d6da9710111aaaeef6769397df6b9feb520f2a Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 11 Feb 2021 16:00:54 -0500 Subject: [PATCH 09/11] update PL commit --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 3b9c67e..06997dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ hdmf==2.2.0 seaborn==0.11.0 -git+https://github.com/ajtritt/pytorch-lightning.git@99d2503373fe1b966cf7014c4ce7e7183766d48a +git+https://github.com/ajtritt/pytorch-lightning.git@fb30942d2c47a95531e063ed35a22f8fba25be12 From 907d361ba90f9e39718f02890a3733e9d2594266 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 11 Feb 2021 16:02:35 -0500 Subject: [PATCH 10/11] update summarize to work for classification results --- src/exabiome/nn/summarize.py | 217 +++++++++++++++++++++-------------- 1 file changed, 130 insertions(+), 87 deletions(-) diff --git a/src/exabiome/nn/summarize.py b/src/exabiome/nn/summarize.py index bc704b7..503a0f0 100644 --- a/src/exabiome/nn/summarize.py +++ b/src/exabiome/nn/summarize.py @@ -31,6 +31,9 @@ def read_outputs(path): if 'viz_emb' in f: ret['viz_emb'] = f['viz_emb'][:] ret['labels'] = f['labels'][:] + + # we won't have these three if we are looking + # at non-representatives if 'train' in f: ret['train_mask'] = f['train'][:] if 'test' in f: @@ -38,6 +41,7 @@ def read_outputs(path): ret['outputs'] = f['outputs'][:] if 'validate' in f: ret['validate_mask'] = f['validate'][:] + ret['orig_lens'] = f['orig_lens'][:] if 'seq_ids' in f: ret['seq_ids'] = f['seq_ids'][:] @@ -66,73 +70,78 @@ def plot_results(path, tvt=True, pred=True, fig_height=7, logger=None, name=None labels = path['labels'] outputs = path['outputs'] + viz_emb = None if 'viz_emb' in path: logger.info('found viz_emb') viz_emb = path['viz_emb'] + # else: + # logger.info('calculating UMAP embeddings for visualization') + # from umap import UMAP + # umap = UMAP(n_components=2) + # viz_emb = umap.fit_transform(outputs) else: - logger.info('calculating UMAP embeddings for visualization') - from umap import UMAP - umap = UMAP(n_components=2) - viz_emb = umap.fit_transform(outputs) + n_plots = 1 color_labels = getattr(pred, 'classes_', None) if color_labels is None: color_labels = labels class_pal = get_color_markers(len(np.unique(color_labels))) - colors = np.array([class_pal[i] for i in color_labels]) # set up figure fig_height = 7 plt.figure(figsize=(n_plots*fig_height, fig_height)) - logger.info('plotting embeddings with species labels') - # plot embeddings - ax = plt.subplot(1, n_plots, plot_count) - plot_seq_emb(viz_emb, labels, ax, pal=class_pal) - if name is not None: - plt.title(name) - plot_count += 1 - - # plot train/validation/testing data - train_mask = None - test_mask = None - validate_mask = None - if tvt: - logger.info('plotting embeddings train/validation/test labels') - train_mask = path['train_mask'] - test_mask = path['test_mask'] - validate_mask = path['validate_mask'] - pal = ['gray', 'red', 'yellow'] - plt.subplot(1, n_plots, plot_count) - dsubs = ['train', 'validation', 'test'] # data subsets - dsub_handles = list() - for (mask, dsub, col) in zip([train_mask, validate_mask, test_mask], dsubs, pal): - plt.scatter(viz_emb[mask, 0], viz_emb[mask, 1], s=0.1, c=[col], label=dsub) - dsub_handles.append(Circle(0, 0, color=col)) - plt.legend(dsub_handles, dsubs) + if viz_emb: + logger.info('plotting embeddings with species labels') + # plot embeddings + ax = plt.subplot(1, n_plots, plot_count) + plot_seq_emb(viz_emb, labels, ax, pal=class_pal) + if name is not None: + plt.title(name) plot_count += 1 + # plot train/validation/testing data + train_mask = None + test_mask = None + validate_mask = None + if tvt: + logger.info('plotting embeddings train/validation/test labels') + train_mask = path['train_mask'] + test_mask = path['test_mask'] + validate_mask = path['validate_mask'] + pal = ['gray', 'red', 'yellow'] + plt.subplot(1, n_plots, plot_count) + dsubs = ['train', 'validation', 'test'] # data subsets + dsub_handles = list() + for (mask, dsub, col) in zip([train_mask, validate_mask, test_mask], dsubs, pal): + plt.scatter(viz_emb[mask, 0], viz_emb[mask, 1], s=0.1, c=[col], label=dsub) + dsub_handles.append(Circle(0, 0, color=col)) + plt.legend(dsub_handles, dsubs) + plot_count += 1 + # run some predictions and plot report if pred is not False: - if pred is None or pred is True: - logger.info('No classifier given, using RandomForestClassifier(n_estimators=30)') - pred = RandomForestClassifier(n_estimators=30) - elif not (hasattr(pred, 'fit') and hasattr(pred, 'predict')): - raise ValueError("argument 'pred' must be a classifier with an SKLearn interface") - - X_test = outputs + y_pred = pred y_test = labels - if not hasattr(pred, 'classes_'): - train_mask = path['train_mask'] - test_mask = path['test_mask'] - X_train = outputs[train_mask] - y_train = labels[train_mask] - logger.info(f'training classifier {pred}') - pred.fit(X_train, y_train) - X_test = outputs[test_mask] - y_test = labels[test_mask] - logger.info(f'getting predictions') - y_pred = pred.predict(X_test) + if not isinstance(pred, (np.ndarray, list)): + if pred is None or pred is True: + logger.info('No classifier given, using RandomForestClassifier(n_estimators=30)') + pred = RandomForestClassifier(n_estimators=30) + elif not (hasattr(pred, 'fit') and hasattr(pred, 'predict')): + raise ValueError("argument 'pred' must be a classifier with an SKLearn interface") + + X_test = outputs + if not hasattr(pred, 'classes_'): + train_mask = path['train_mask'] + test_mask = path['test_mask'] + X_train = outputs[train_mask] + y_train = labels[train_mask] + logger.info(f'training classifier {pred}') + pred.fit(X_train, y_train) + X_test = outputs[test_mask] + y_test = labels[test_mask] + logger.info(f'getting predictions') + y_pred = pred.predict(X_test) logger.info(f'plotting classification report') # plot classification report @@ -156,15 +165,15 @@ def aggregated_chunk_analysis(path, clf, fig_height=7): viz_emb = None if 'viz_emb' in path: viz_emb = path['viz_emb'] - else: - viz_emb = UMAP(n_components=2).fit_transform(X) uniq_seqs = np.unique(seq_ids) X_mean = np.zeros((uniq_seqs.shape[0], outputs.shape[1])) X_median = np.zeros((uniq_seqs.shape[0], outputs.shape[1])) y = np.zeros(uniq_seqs.shape[0], dtype=int) seq_len = np.zeros(uniq_seqs.shape[0], dtype=int) - seq_viz = np.zeros((uniq_seqs.shape[0], 2)) + seq_viz = None + if viz_emb is not None: + seq_viz = np.zeros((uniq_seqs.shape[0], 2)) for seq_i, seq in enumerate(uniq_seqs): seq_mask = seq_ids == seq @@ -174,45 +183,61 @@ def aggregated_chunk_analysis(path, clf, fig_height=7): y[seq_i] = uniq_labels[0] X_mean[seq_i] = outputs[seq_mask].mean(axis=0) X_median[seq_i] = np.median(outputs[seq_mask], axis=0) - seq_viz[seq_i] = viz_emb[seq_mask].mean(axis=0) + if seq_viz is not None: + seq_viz[seq_i] = viz_emb[seq_mask].mean(axis=0) seq_len[seq_i] = olens[seq_mask].sum() seq_len = np.log10(seq_len) - color_labels = getattr(clf, 'classes_', None) - if color_labels is None: - color_labels = labels - class_pal = get_color_markers(len(np.unique(color_labels))) + fig, axes = None, None + figsize_factor = 7 + class_pal = None + if isinstance(clf, (list, np.ndarray)): + nrows = 2 + ncols = 1 + fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(nrows*figsize_factor, ncols*figsize_factor)) + axes = np.expand_dims(axes, axis=1) + all_preds = np.argmax(outputs, axis=1) + class_pal = get_color_markers(outputs.shape[1]) + else: + color_labels = getattr(clf, 'classes_', None) + if color_labels is None: + color_labels = labels + class_pal = get_color_markers(len(np.unique(color_labels))) - fig, axes = plt.subplots(nrows=3, ncols=3, sharey='row', figsize=(21, 21)) + nrows = 3 if seq_viz is not None else 2 + ncols = 3 + fig, axes = plt.subplots(nrows=nrows, ncols=ncols, sharey='row', figsize=(nrows*figsize_factor, ncols*figsize_factor)) - # classifier from MEAN of outputs - output_mean_preds = clf.predict(X_mean) - make_plots(seq_viz, y, output_mean_preds, axes[:,0], class_pal, seq_len, 'Mean classification') + # classifier from MEAN of outputs + output_mean_preds = clf.predict(X_mean) + make_plots(seq_viz, y, output_mean_preds, axes[:,0], class_pal, seq_len, 'Mean classification') - # classifier from MEDIAN of outputs - output_median_preds = clf.predict(X_median) - make_plots(seq_viz, y, output_median_preds, axes[:,1], class_pal, seq_len, 'Median classification') + # classifier from MEDIAN of outputs + output_median_preds = clf.predict(X_median) + make_plots(seq_viz, y, output_median_preds, axes[:,1], class_pal, seq_len, 'Median classification') - # classifier from voting with chunk predictions - all_preds = clf.predict(outputs) - vote_preds = np.zeros_like(output_mean_preds) + # classifier from voting with chunk predictions + all_preds = clf.predict(outputs) + + vote_preds = np.zeros(X_mean.shape[0], dtype=int) for seq_i, seq in enumerate(uniq_seqs): seq_mask = seq_ids == seq vote_preds[seq_i] = stats.mode(all_preds[seq_mask])[0][0] - make_plots(seq_viz, y, vote_preds, axes[:,2], class_pal, seq_len, 'Vote classification') + make_plots(seq_viz, y, vote_preds, axes[:,-1], class_pal, seq_len, 'Vote classification') plt.tight_layout() def make_plots(seq_viz, true, pred, axes, pal, seq_len, title): - plot_seq_emb(seq_viz, pred, axes[0], pal=pal) - axes[0].set_title(title) - axes[0].set_xlabel('Mean of first UMAP dimension') - axes[0].set_ylabel('Mean of second UMAP dimension') - plot_clf_report(true, pred, axes[1], pal=pal) - plot_acc_of_len(true, pred, seq_len, axes[2]) + if seq_viz is not None: + plot_seq_emb(seq_viz, pred, axes[-3], pal=pal) + axes[0].set_title(title) + axes[0].set_xlabel('Mean of first UMAP dimension') + axes[0].set_ylabel('Mean of second UMAP dimension') + plot_clf_report(true, pred, axes[-2], pal=pal) + plot_acc_of_len(true, pred, seq_len, axes[-1]) def plot_acc_of_len(y_true, y_pred, seq_len, ax): @@ -269,6 +294,9 @@ def summarize(argv=None): parser.add_argument('-A', '--aggregate_chunks', action='store_true', default=False, help='aggregate chunks within sequences and perform analysis') parser.add_argument('-o', '--outdir', type=str, default=None, help='the output directory for figures') + type_group = parser.add_argument_group('Problem type').add_mutually_exclusive_group() + type_group.add_argument('-C', '--classify', action='store_true', help='run a classification problem', default=False) + type_group.add_argument('-M', '--manifold', action='store_true', help='run a manifold learning problem', default=False) args = parser.parse_args(args=argv) if os.path.isdir(args.input): @@ -287,23 +315,38 @@ def summarize(argv=None): fig_path = os.path.join(outdir, 'summary.png') logger = parse_logger('') - plt.figure(figsize=(21, 7)) - pretrained = False - if args.classifier is not None: - with open(args.classifier, 'rb') as f: - pred = pickle.load(f) - pretrained = True - else: - pred = RandomForestClassifier(n_estimators=30) outputs = read_outputs(args.input) - pred = plot_results(outputs, pred=pred, name='/'.join(args.input.split('/',)[-2:]), logger=logger) + if args.classify: + plt.figure(figsize=(7, 7)) + labels = outputs['labels'] + model_outputs = outputs['outputs'] + if 'test_mask' in outputs: + mask = outputs['test_mask'] + labels = labels[mask] + model_outputs = model_outputs[mask] + + pred = np.argmax(model_outputs, axis=1) + class_pal = get_color_markers(model_outputs.shape[1]) + colors = np.array([class_pal[i] for i in labels]) + ax = plt.gca() + plot_clf_report(labels, pred, ax=ax, pal=class_pal) + else: + plt.figure(figsize=(21, 7)) + pretrained = False + if args.classifier is not None: + with open(args.classifier, 'rb') as f: + pred = pickle.load(f) + pretrained = True + else: + pred = RandomForestClassifier(n_estimators=30) + pred = plot_results(outputs, pred=pred, name='/'.join(args.input.split('/',)[-2:]), logger=logger) + if not pretrained: + clf_path = os.path.join(outdir, 'summary.rf.pkl') + logger.info(f'saving classifier to {clf_path}') + with open(clf_path, 'wb') as f: + pickle.dump(pred, f) logger.info(f'saving figure to {fig_path}') plt.savefig(fig_path, dpi=100) - if not pretrained: - clf_path = os.path.join(outdir, 'summary.rf.pkl') - logger.info(f'saving classifier to {clf_path}') - with open(clf_path, 'wb') as f: - pickle.dump(pred, f) if args.aggregate_chunks: logger.info(f'running summary by aggregating chunks within sequences') From 242a731a2cd1585166230396412691b2b364fba6 Mon Sep 17 00:00:00 2001 From: Ryan Ly Date: Thu, 11 Feb 2021 14:21:30 -0800 Subject: [PATCH 11/11] add support for cpu accelerator --- src/exabiome/nn/train.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/exabiome/nn/train.py b/src/exabiome/nn/train.py index 719af14..2a25646 100644 --- a/src/exabiome/nn/train.py +++ b/src/exabiome/nn/train.py @@ -12,7 +12,7 @@ from pytorch_lightning.loggers import TensorBoardLogger from pytorch_lightning.callbacks import ModelCheckpoint import pytorch_lightning.cluster_environments as cenv - +from pytorch_lightning.accelerators.cpu_accelerator import CPUAccelerator import argparse @@ -152,7 +152,6 @@ def process_args(args=None, return_io=False): targs['gpus'] = process_gpus(args.gpus) targs['num_nodes'] = args.num_nodes if targs['gpus'] != 1 or targs['num_nodes'] > 1: - targs['accelerator'] = 'ddp' env = None if args.lsf: env = cenv.LSFEnvironment() @@ -163,6 +162,10 @@ def process_args(args=None, return_io=False): file=sys.stderr) sys.exit(1) targs.setdefault('plugins', list()).append(env) + if targs['gpus'] is not None: + targs['accelerator'] = 'ddp' + else: + targs['accelerator'] = CPUAccelerator(trainer=None, cluster_environment=env) del args.gpus if args.debug: