diff --git a/requirements.txt b/requirements.txt index e0a21c4..2bd1074 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ git+https://github.com/hdmf-dev/hdmf.git@0efea00ff1d10c4021c8145e917d92566f7edb4c seaborn==0.11.0 -git+https://github.com/ajtritt/pytorch-lightning.git@99d2503373fe1b966cf7014c4ce7e7183766d48a -torch==1.6.0 +git+https://github.com/ajtritt/pytorch-lightning.git@fb30942d2c47a95531e063ed35a22f8fba25be12 diff --git a/src/exabiome/nn/loader.py b/src/exabiome/nn/loader.py index 50c6bd5..ad7a42c 100644 --- a/src/exabiome/nn/loader.py +++ b/src/exabiome/nn/loader.py @@ -1,3 +1,4 @@ +import os import pytorch_lightning as pl import torch.nn.functional as F import torch @@ -94,6 +95,9 @@ def dataset_stats(argv=None): def read_dataset(path): + for root, dirs, files in os.walk("/mnt/bb/ajtritt/"): + for filename in files: + print(rank, '-', filename) hdmfio = get_hdf5io(path, 'r') difile = hdmfio.read() dataset = SeqDataset(difile) @@ -392,6 +396,49 @@ def get_loader(dataset, distances=False, **kwargs): return DataLoader(dataset, collate_fn=collater, **kwargs) +<<<<<<< HEAD +class DIDataModule(pl.LightningDataModule): + + def __init__(self, hparams, inference=False): + self.hparams = hparams + self.inference = inference + + def train_dataloader(self): + self._check_loaders() + return self.loaders['train'] + + def val_dataloader(self): + self._check_loaders() + return self.loaders['validate'] + + def test_dataloader(self): + self._check_loaders() + return self.loaders['test'] + + + + def _check_loaders(self): + """ + Load dataset if it has not been loaded yet + """ + dataset, io = process_dataset(self.hparams, inference=self._inference) + if self.hparams.load: + dataset.load() + + kwargs = dict(random_state=self.hparams.seed, + batch_size=self.hparams.batch_size, + distances=self.hparams.manifold, + downsample=self.hparams.downsample) + kwargs.update(self.hparams.loader_kwargs) + if self._inference: + kwargs['distances'] = False + kwargs.pop('num_workers', None) + kwargs.pop('multiprocessing_context', None) + tr, te, va = train_test_loaders(dataset, **kwargs) + self.loaders = {'train': tr, 'test': te, 'validate': va} + self.dataset = dataset + +======= class DeepIndexDataModule(pl.LightningDataModule): def __init__(self, hparams, inference=False): @@ -419,3 +466,4 @@ def val_dataloader(self): def test_dataloader(self): return self.loaders['test'] +>>>>>>> master diff --git a/src/exabiome/nn/summarize.py b/src/exabiome/nn/summarize.py index f25ba09..6c72b65 100644 --- a/src/exabiome/nn/summarize.py +++ b/src/exabiome/nn/summarize.py @@ -31,6 +31,9 @@ def read_outputs(path): if 'viz_emb' in f: ret['viz_emb'] = f['viz_emb'][:] ret['labels'] = f['labels'][:] + + # we won't have these three if we are looking + # at non-representatives if 'train' in f: ret['train_mask'] = f['train'][:] if 'test' in f: @@ -38,6 +41,7 @@ def read_outputs(path): ret['outputs'] = f['outputs'][:] if 'validate' in f: ret['validate_mask'] = f['validate'][:] + ret['orig_lens'] = f['orig_lens'][:] if 'seq_ids' in f: ret['seq_ids'] = f['seq_ids'][:] @@ -66,73 +70,78 @@ def plot_results(path, tvt=True, pred=True, fig_height=7, logger=None, name=None labels = path['labels'] outputs = path['outputs'] + viz_emb = None if 'viz_emb' in path: logger.info('found viz_emb') viz_emb = path['viz_emb'] + # else: + # logger.info('calculating UMAP embeddings for visualization') + # from umap import UMAP + # umap = UMAP(n_components=2) + # viz_emb = umap.fit_transform(outputs) else: - logger.info('calculating UMAP embeddings for visualization') - from umap import UMAP - umap = UMAP(n_components=2) - viz_emb = umap.fit_transform(outputs) + n_plots = 1 color_labels = getattr(pred, 'classes_', None) if color_labels is None: color_labels = labels class_pal = get_color_markers(len(np.unique(color_labels))) - colors = np.array([class_pal[i] for i in color_labels]) # set up figure fig_height = 7 plt.figure(figsize=(n_plots*fig_height, fig_height)) - logger.info('plotting embeddings with species labels') - # plot embeddings - ax = plt.subplot(1, n_plots, plot_count) - plot_seq_emb(viz_emb, labels, ax, pal=class_pal) - if name is not None: - plt.title(name) - plot_count += 1 - - # plot train/validation/testing data - train_mask = None - test_mask = None - validate_mask = None - if tvt: - logger.info('plotting embeddings train/validation/test labels') - train_mask = path['train_mask'] - test_mask = path['test_mask'] - validate_mask = path['validate_mask'] - pal = ['gray', 'red', 'yellow'] - plt.subplot(1, n_plots, plot_count) - dsubs = ['train', 'validation', 'test'] # data subsets - dsub_handles = list() - for (mask, dsub, col) in zip([train_mask, validate_mask, test_mask], dsubs, pal): - plt.scatter(viz_emb[mask, 0], viz_emb[mask, 1], s=0.1, c=[col], label=dsub) - dsub_handles.append(Circle(0, 0, color=col)) - plt.legend(dsub_handles, dsubs) + if viz_emb: + logger.info('plotting embeddings with species labels') + # plot embeddings + ax = plt.subplot(1, n_plots, plot_count) + plot_seq_emb(viz_emb, labels, ax, pal=class_pal) + if name is not None: + plt.title(name) plot_count += 1 + # plot train/validation/testing data + train_mask = None + test_mask = None + validate_mask = None + if tvt: + logger.info('plotting embeddings train/validation/test labels') + train_mask = path['train_mask'] + test_mask = path['test_mask'] + validate_mask = path['validate_mask'] + pal = ['gray', 'red', 'yellow'] + plt.subplot(1, n_plots, plot_count) + dsubs = ['train', 'validation', 'test'] # data subsets + dsub_handles = list() + for (mask, dsub, col) in zip([train_mask, validate_mask, test_mask], dsubs, pal): + plt.scatter(viz_emb[mask, 0], viz_emb[mask, 1], s=0.1, c=[col], label=dsub) + dsub_handles.append(Circle(0, 0, color=col)) + plt.legend(dsub_handles, dsubs) + plot_count += 1 + # run some predictions and plot report if pred is not False: - if pred is None or pred is True: - logger.info('No classifier given, using RandomForestClassifier(n_estimators=30)') - pred = RandomForestClassifier(n_estimators=30) - elif not (hasattr(pred, 'fit') and hasattr(pred, 'predict')): - raise ValueError("argument 'pred' must be a classifier with an SKLearn interface") - - X_test = outputs + y_pred = pred y_test = labels - if not hasattr(pred, 'classes_'): - train_mask = path['train_mask'] - test_mask = path['test_mask'] - X_train = outputs[train_mask] - y_train = labels[train_mask] - logger.info(f'training classifier {pred}') - pred.fit(X_train, y_train) - X_test = outputs[test_mask] - y_test = labels[test_mask] - logger.info(f'getting predictions') - y_pred = pred.predict(X_test) + if not isinstance(pred, (np.ndarray, list)): + if pred is None or pred is True: + logger.info('No classifier given, using RandomForestClassifier(n_estimators=30)') + pred = RandomForestClassifier(n_estimators=30) + elif not (hasattr(pred, 'fit') and hasattr(pred, 'predict')): + raise ValueError("argument 'pred' must be a classifier with an SKLearn interface") + + X_test = outputs + if not hasattr(pred, 'classes_'): + train_mask = path['train_mask'] + test_mask = path['test_mask'] + X_train = outputs[train_mask] + y_train = labels[train_mask] + logger.info(f'training classifier {pred}') + pred.fit(X_train, y_train) + X_test = outputs[test_mask] + y_test = labels[test_mask] + logger.info(f'getting predictions') + y_pred = pred.predict(X_test) logger.info(f'plotting classification report') # plot classification report @@ -156,15 +165,15 @@ def aggregated_chunk_analysis(path, clf, fig_height=7): viz_emb = None if 'viz_emb' in path: viz_emb = path['viz_emb'] - else: - viz_emb = UMAP(n_components=2).fit_transform(X) uniq_seqs = np.unique(seq_ids) X_mean = np.zeros((uniq_seqs.shape[0], outputs.shape[1])) X_median = np.zeros((uniq_seqs.shape[0], outputs.shape[1])) y = np.zeros(uniq_seqs.shape[0], dtype=int) seq_len = np.zeros(uniq_seqs.shape[0], dtype=int) - seq_viz = np.zeros((uniq_seqs.shape[0], 2)) + seq_viz = None + if viz_emb is not None: + seq_viz = np.zeros((uniq_seqs.shape[0], 2)) for seq_i, seq in enumerate(uniq_seqs): seq_mask = seq_ids == seq @@ -174,17 +183,31 @@ def aggregated_chunk_analysis(path, clf, fig_height=7): y[seq_i] = uniq_labels[0] X_mean[seq_i] = outputs[seq_mask].mean(axis=0) X_median[seq_i] = np.median(outputs[seq_mask], axis=0) - seq_viz[seq_i] = viz_emb[seq_mask].mean(axis=0) + if seq_viz is not None: + seq_viz[seq_i] = viz_emb[seq_mask].mean(axis=0) seq_len[seq_i] = olens[seq_mask].sum() seq_len = np.log10(seq_len) - color_labels = getattr(clf, 'classes_', None) - if color_labels is None: - color_labels = labels - class_pal = get_color_markers(len(np.unique(color_labels))) + fig, axes = None, None + figsize_factor = 7 + class_pal = None + if isinstance(clf, (list, np.ndarray)): + nrows = 2 + ncols = 1 + fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(nrows*figsize_factor, ncols*figsize_factor)) + axes = np.expand_dims(axes, axis=1) + all_preds = np.argmax(outputs, axis=1) + class_pal = get_color_markers(outputs.shape[1]) + else: + color_labels = getattr(clf, 'classes_', None) + if color_labels is None: + color_labels = labels + class_pal = get_color_markers(len(np.unique(color_labels))) - fig, axes = plt.subplots(nrows=3, ncols=3, sharey='row', figsize=(21, 21)) + nrows = 3 if seq_viz is not None else 2 + ncols = 3 + fig, axes = plt.subplots(nrows=nrows, ncols=ncols, sharey='row', figsize=(nrows*figsize_factor, ncols*figsize_factor)) # classifier from MEAN of outputs output_mean_preds = clf.predict(X_mean) @@ -194,9 +217,10 @@ def aggregated_chunk_analysis(path, clf, fig_height=7): output_median_preds = clf.predict(X_median) make_plots(y, output_median_preds, axes[:,1], class_pal, seq_len, 'Median classification', seq_viz) - # classifier from voting with chunk predictions - all_preds = clf.predict(outputs) - vote_preds = np.zeros_like(output_mean_preds) + # classifier from voting with chunk predictions + all_preds = clf.predict(outputs) + + vote_preds = np.zeros(X_mean.shape[0], dtype=int) for seq_i, seq in enumerate(uniq_seqs): seq_mask = seq_ids == seq vote_preds[seq_i] = stats.mode(all_preds[seq_mask])[0][0] @@ -387,6 +411,9 @@ def summarize(argv=None): parser.add_argument('-A', '--aggregate_chunks', action='store_true', default=False, help='aggregate chunks within sequences and perform analysis') parser.add_argument('-o', '--outdir', type=str, default=None, help='the output directory for figures') + type_group = parser.add_argument_group('Problem type').add_mutually_exclusive_group() + type_group.add_argument('-C', '--classify', action='store_true', help='run a classification problem', default=False) + type_group.add_argument('-M', '--manifold', action='store_true', help='run a manifold learning problem', default=False) args = parser.parse_args(args=argv) if os.path.isdir(args.input): @@ -405,23 +432,38 @@ def summarize(argv=None): fig_path = os.path.join(outdir, 'summary.png') logger = parse_logger('') - plt.figure(figsize=(21, 7)) - pretrained = False - if args.classifier is not None: - with open(args.classifier, 'rb') as f: - pred = pickle.load(f) - pretrained = True - else: - pred = RandomForestClassifier(n_estimators=30) outputs = read_outputs(args.input) - pred = plot_results(outputs, pred=pred, name='/'.join(args.input.split('/',)[-2:]), logger=logger) + if args.classify: + plt.figure(figsize=(7, 7)) + labels = outputs['labels'] + model_outputs = outputs['outputs'] + if 'test_mask' in outputs: + mask = outputs['test_mask'] + labels = labels[mask] + model_outputs = model_outputs[mask] + + pred = np.argmax(model_outputs, axis=1) + class_pal = get_color_markers(model_outputs.shape[1]) + colors = np.array([class_pal[i] for i in labels]) + ax = plt.gca() + plot_clf_report(labels, pred, ax=ax, pal=class_pal) + else: + plt.figure(figsize=(21, 7)) + pretrained = False + if args.classifier is not None: + with open(args.classifier, 'rb') as f: + pred = pickle.load(f) + pretrained = True + else: + pred = RandomForestClassifier(n_estimators=30) + pred = plot_results(outputs, pred=pred, name='/'.join(args.input.split('/',)[-2:]), logger=logger) + if not pretrained: + clf_path = os.path.join(outdir, 'summary.rf.pkl') + logger.info(f'saving classifier to {clf_path}') + with open(clf_path, 'wb') as f: + pickle.dump(pred, f) logger.info(f'saving figure to {fig_path}') plt.savefig(fig_path, dpi=100) - if not pretrained: - clf_path = os.path.join(outdir, 'summary.rf.pkl') - logger.info(f'saving classifier to {clf_path}') - with open(clf_path, 'wb') as f: - pickle.dump(pred, f) if args.aggregate_chunks: logger.info(f'running summary by aggregating chunks within sequences') diff --git a/src/exabiome/nn/train.py b/src/exabiome/nn/train.py index 8cf960e..c6feddc 100644 --- a/src/exabiome/nn/train.py +++ b/src/exabiome/nn/train.py @@ -9,6 +9,12 @@ from .utils import process_gpus, process_model, process_output from .loader import add_dataset_arguments, DeepIndexDataModule from hdmf.utils import docval +from pytorch_lightning import Trainer, seed_everything +from pytorch_lightning.loggers import TensorBoardLogger +from pytorch_lightning.callbacks import ModelCheckpoint +import pytorch_lightning.cluster_environments as cenv +from pytorch_lightning.accelerators.cpu_accelerator import CPUAccelerator + import argparse import logging @@ -121,7 +127,6 @@ def process_args(args=None, return_io=False): args.loader_kwargs = dict() if args.lsf: args.loader_kwargs['num_workers'] = 6 - args.loader_kwargs['multiprocessing_context'] = 'spawn' # classify by default if args.manifold == args.classify == False: @@ -168,17 +173,20 @@ def process_args(args=None, return_io=False): targs['num_nodes'] = args.num_nodes targs['accelerator'] = 'ddp' if targs['gpus'] != 1 or targs['num_nodes'] > 1: - # env = None - # if args.lsf: - # env = cenv.LSFEnvironment() - # elif args.slurm: - # env = cenv.SLURMEnvironment() - # else: - # print("If running multi-node or multi-gpu, you must specify resource manager, i.e. --lsf or --slurm", - # file=sys.stderr) - # sys.exit(1) - # targs.setdefault('plugins', list()).append(env) - pass + env = None + if args.lsf: + env = cenv.LSFEnvironment() + elif args.slurm: + env = cenv.SLURMEnvironment() + else: + print("If running multi-node or multi-gpu, you must specify resource manager, i.e. --lsf or --slurm", + file=sys.stderr) + sys.exit(1) + targs.setdefault('plugins', list()).append(env) + if targs['gpus'] is not None: + targs['accelerator'] = 'ddp' + else: + targs['accelerator'] = CPUAccelerator(trainer=None, cluster_environment=env) del args.gpus if args.debug: @@ -332,8 +340,11 @@ def cuda_sum(argv=None): print('torch.cuda.is_available:', torch.cuda.is_available()) print('torch.cuda.device_count:', torch.cuda.device_count()) -def print_dataloader(dl): - print(dl.dataset.index[0], dl.dataset.index[-1]) +def print_dataloaders(**loaders): + msg = list() + for k, v in loaders.items(): + msg.append("%s=(%s, %s)" % (k, v.dataset.index[0], v.dataset.index[-1])) + print(" ".join(msg), file=sys.stderr) def overall_metric(model, loader, metric): val = 0.0 diff --git a/src/exabiome/run/cori.py b/src/exabiome/run/cori.py index d54cbe5..2cd36f9 100644 --- a/src/exabiome/run/cori.py +++ b/src/exabiome/run/cori.py @@ -49,6 +49,5 @@ def __init__(self, **kwargs): n_gpus = self.gpus - def write_run(self, f, command, command_options, options): print(f'srun -u {command}', file=f) diff --git a/src/exabiome/run/job.py b/src/exabiome/run/job.py index 14516e0..8afe036 100644 --- a/src/exabiome/run/job.py +++ b/src/exabiome/run/job.py @@ -161,9 +161,9 @@ def write(self, f, options=None): print(file=f) for k, v in self.env_vars.items(): if isinstance(v, str): - print(f'{k}="{v}"', file=f) + print(f'export {k}="{v}"', file=f) else: - print(f'{k}={v}', file=f) + print(f'export {k}={v}', file=f) print(file=f) for c in self.commands: #if isinstance(c, dict): diff --git a/src/exabiome/run/run_job.py b/src/exabiome/run/run_job.py index 9324ed6..42895bf 100644 --- a/src/exabiome/run/run_job.py +++ b/src/exabiome/run/run_job.py @@ -54,6 +54,7 @@ def run_train(argv=None): rsc_grp.add_argument('-N', '--jobname', help="the name of the job", default=None) rsc_grp.add_argument('-q', '--queue', help="the queue to submit to", default=None) rsc_grp.add_argument('-P', '--project', help="the project/account to submit under", default=None) + rsc_grp.add_argument('-a', '--arch', help="the architecture to use, e.g., gpu or haswell (cori only)", default='gpu') system_grp = parser.add_argument_group('Compute system') grp = system_grp.add_mutually_exclusive_group() @@ -180,7 +181,8 @@ def run_train(argv=None): if args.nodes > 1: job.set_env_var('OMP_NUM_THREADS', 1) - job.set_env_var('NCCL_DEBUG', 'INFO') + job.set_env_var('NCCL_SOCKET_IFNAME', 'ib0') + job.set_env_var('NCCL_DEBUG', 'WARN') job.set_env_var('OPTIONS', options) job.set_env_var('OUTDIR', f'{expdir}/train.$JOB') @@ -202,7 +204,7 @@ def run_train(argv=None): if args.summit and job.use_bb: job.add_command('echo "$INPUT to $BB_INPUT"') - job.add_command('cp $INPUT $BB_INPUT', run='jsrun -n 1') + job.add_command(f'cp $INPUT $BB_INPUT', run='jsrun -n {args.nodes}') job.add_command('ls /mnt/bb/$USER', run='jsrun -n 1') job.add_command('ls $BB_INPUT', run='jsrun -n 1') @@ -218,6 +220,7 @@ def run_train(argv=None): # when using regular DDP, jsrun should be called with one resource per node (-r) and # one rank per GPU (-a) to work with PyTorch Lightning jsrun = f'jsrun -g {args.gpus} -n {args.nodes} -a {args.gpus} -r 1 -c 42' + #jsrun = f'jsrun -g {args.gpus} -n {args.nodes} -a 1 -r 1 -c 42' job.add_command('$CMD > $LOG 2>&1', run=jsrun) else: job.add_command('$CMD > $LOG 2>&1', run='srun')