diff --git a/src/toil_lib/spark.py b/src/toil_lib/spark.py index f58829d..700af74 100644 --- a/src/toil_lib/spark.py +++ b/src/toil_lib/spark.py @@ -27,6 +27,8 @@ def spawn_spark_cluster(job, numWorkers, + sparkMasterContainer="quay.io/ucsc_cgl/apache-spark-master:1.5.2", + sparkWorkerContainer="quay.io/ucsc_cgl/apache-spark-worker:1.5.2", cores=None, memory=None, disk=None, @@ -34,13 +36,17 @@ def spawn_spark_cluster(job, ''' :param numWorkers: The number of worker nodes to have in the cluster. \ Must be greater than or equal to 1. + :param sparkMasterContainer: The Docker image to run for the Spark master. + :param sparkWorkerContainer: The Docker image to run for the Spark worker. :param cores: Optional parameter to set the number of cores per node. \ If not provided, we use the number of cores on the node that launches \ the service. :param memory: Optional parameter to set the memory requested per node. :param disk: Optional parameter to set the disk requested per node. - :type leaderMemory: int or string convertable by bd2k.util.humanize.human2bytes to an int :type numWorkers: int + :type sparkMasterContainer: str + :type sparkWorkerContainer: str + :type leaderMemory: int or string convertable by bd2k.util.humanize.human2bytes to an int :type cores: int :type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int :type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int @@ -49,13 +55,15 @@ def spawn_spark_cluster(job, if numWorkers < 1: raise ValueError("Must have more than one worker. %d given." % numWorkers) - leaderService = SparkService(cores=cores, + leaderService = SparkService(sparkMasterContainer, + cores=cores, memory=memory, disk=disk, overrideLeaderIP=overrideLeaderIP) leaderIP = job.addService(leaderService) for i in range(numWorkers): job.addService(WorkerService(leaderIP, + sparkWorkerContainer, cores=cores, disk=disk, memory=memory), @@ -98,16 +106,19 @@ class SparkService(Job.Service): """ def __init__(self, + sparkContainer, memory=None, disk=None, cores=None, overrideLeaderIP=None): """ + :param sparkContainer: The Docker container name to run for Spark. :param memory: The amount of memory to be requested for the Spark leader. Optional. :param disk: The amount of disk to be requested for the Spark leader. Optional. :param cores: Optional parameter to set the number of cores per node. \ If not provided, we use the number of cores on the node that launches \ the service. + :type sparkContainer: str :type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int :type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int :type cores: int @@ -117,6 +128,7 @@ def __init__(self, cores = multiprocessing.cpu_count() self.hostname = overrideLeaderIP + self.sparkContainer = sparkContainer Job.Service.__init__(self, memory=memory, cores=cores, disk=disk) @@ -135,9 +147,10 @@ def start(self, job): self.sparkContainerID = dockerCheckOutput(job=job, defer=STOP, workDir=os.getcwd(), - tool="quay.io/ucsc_cgl/apache-spark-master:1.5.2", + tool=self.sparkContainer, dockerParameters=["--net=host", "-d", + "-v", "/var/run/docker.sock:/var/run/docker.sock", "-v", "/mnt/ephemeral/:/ephemeral/:rw", "-e", "SPARK_MASTER_IP=" + self.hostname, "-e", "SPARK_LOCAL_DIRS=/ephemeral/spark/local", @@ -188,19 +201,24 @@ class WorkerService(Job.Service): Should not be called outside of `SparkService`. """ - def __init__(self, masterIP, memory=None, cores=None, disk=None): + def __init__(self, masterIP, sparkContainer, memory=None, cores=None, disk=None): """ + :param masterIP: The IP of the Spark master. + :param sparkContainer: The container to run for Spark. :param memory: The memory requirement for each node in the cluster. Optional. :param disk: The disk requirement for each node in the cluster. Optional. :param cores: Optional parameter to set the number of cores per node. \ If not provided, we use the number of cores on the node that launches \ the service. + :type masterIP: str + :type sparkContainer: str :type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int :type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int :type cores: int """ self.masterIP = masterIP + self.sparkContainer = sparkContainer if cores is None: cores = multiprocessing.cpu_count() @@ -219,9 +237,10 @@ def start(self, job): self.sparkContainerID = dockerCheckOutput(job=job, defer=STOP, workDir=os.getcwd(), - tool="quay.io/ucsc_cgl/apache-spark-worker:1.5.2", + tool=self.sparkContainer, dockerParameters=["--net=host", "-d", + "-v", "/var/run/docker.sock:/var/run/docker.sock", "-v", "/mnt/ephemeral/:/ephemeral/:rw", "-e", "\"SPARK_MASTER_IP=" + self.masterIP + ":" + _SPARK_MASTER_PORT + "\"", diff --git a/src/toil_lib/tools/__init__.py b/src/toil_lib/tools/__init__.py index a3a192a..2e33c18 100644 --- a/src/toil_lib/tools/__init__.py +++ b/src/toil_lib/tools/__init__.py @@ -2,6 +2,17 @@ import subprocess +def log_runtime(job, start, end, cmd): + + elapsed_time = end - start + + hours = int(elapsed_time) / (60 * 60) + minutes = int(elapsed_time - (60 * 60 * hours)) / 60 + seconds = int(elapsed_time - (60 * 60 * hours) - (60 * minutes)) % 60 + + job.fileStore.logToMaster("%s ran in %dh%dm%ds" % (cmd, hours, minutes, seconds)) + + def get_mean_insert_size(work_dir, bam_name): """Function taken from MC3 Pipeline""" cmd = "docker run --log-driver=none --rm -v {}:/data quay.io/ucsc_cgl/samtools " \ diff --git a/src/toil_lib/tools/aligners.py b/src/toil_lib/tools/aligners.py index 2223392..54668ba 100644 --- a/src/toil_lib/tools/aligners.py +++ b/src/toil_lib/tools/aligners.py @@ -1,8 +1,11 @@ import os import subprocess +import time from toil.lib.docker import dockerCall +from toil_lib import require +from toil_lib.tools import log_runtime from toil_lib.urls import download_url @@ -82,7 +85,11 @@ def run_star(job, r1_id, r2_id, star_index_url, wiggle=False, sort=True): return transcriptome_id, aligned_id, log_id, sj_id -def run_bwakit(job, config, sort=True, trim=False, mark_secondary=False): +def run_bwakit(job, config, + sort=True, + trim=False, + mark_secondary=False, + benchmarking=False): """ Runs BWA-Kit to align single or paired-end fastq files or realign SAM/BAM files. @@ -118,6 +125,8 @@ def run_bwakit(job, config, sort=True, trim=False, mark_secondary=False): :param bool sort: If True, sorts the BAM :param bool trim: If True, performs adapter trimming :param bool mark_secondary: If True, mark shorter split reads as secondary + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID of BAM :rtype: str """ @@ -180,9 +189,165 @@ def run_bwakit(job, config, sort=True, trim=False, mark_secondary=False): for sample in samples: parameters.append('/data/{}'.format(sample)) + start_time = time.time() dockerCall(job=job, tool='quay.io/ucsc_cgl/bwakit:0.7.12--c85ccff267d5021b75bb1c9ccf5f4b79f91835cc', parameters=parameters, workDir=work_dir) + end_time = time.time() + log_runtime(job, start_time, end_time, 'bwakit') # Either write file to local output directory or upload to S3 cloud storage job.fileStore.logToMaster('Aligned sample: {}'.format(config.uuid)) - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'aligned.aln.bam')) + bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'aligned.aln.bam')) + if benchmarking: + return (bam_id, (end_time - start_time)) + else: + return bam_id + + +def run_bowtie2(job, + read1, + index_ids, + read2=None, + benchmarking=False): + ''' + Runs bowtie2 for alignment. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str read1: The FileStoreID of the first-of-pair file. + :param str name1: The FileStoreID of the NAME.1.bt2 index file. + :param str name2: The FileStoreID of the NAME.2.bt2 index file. + :param str name3: The FileStoreID of the NAME.3.bt2 index file. + :param str name4: The FileStoreID of the NAME.4.bt2 index file. + :param str rev1: The FileStoreID of the NAME.rev.1.bt2 index file. + :param str rev2: The FileStoreID of the NAME.rev.2.bt2 index file. + :param str ref: The reference FASTA FileStoreID. + :param str read2: The (optional) FileStoreID of the first-of-pair file. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + file_ids = [ref, read1] + file_ids.extend(index_ids) + file_names = ['ref.fa', 'read1.fq', + 'ref.1.bt2', 'ref.2.bt2', 'ref.3.bt2', 'ref.4.bt2', + 'ref.rev.1.bt2', 'ref.rev.2.bt2'] + + parameters = ['-x', '/data/ref', + '-1', '/data/read1.fq', + '-S', '/data/sample.sam', + '-t', str(job.cores)] + + if read2: + file_ids.append(read2) + file_names.append('read2.fq') + parameters.extend(['-2', '/data/read2.fq']) + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=parameters, + tool='quay.io/ucsc_cgl/bowtie2') + end_time = time.time() + log_runtime(job, start_time, end_time, 'bowtie2') + + sam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.sam')) + if benchmarking: + return (sam_id, (end_time - start_time)) + else: + return sam_id + + +def run_snap(job, + read1, + index_ids, + read2=None, + sort=False, + mark_duplicates=False, + benchmarking=False): + ''' + Runs SNAP for alignment. + + If both first- and second-of-pair reads are passed, runs in paired mode. + Else runs in single mode. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str read1: The FileStoreID of the first-of-pair file. + :param str genome: The FileStoreID of the SNAP Genome index. + :param str genome_index: The FileStoreID of the SNAP GenomeIndex index. + :param str genome_hash: The FileStoreID of the SNAP GenomeIndexHash index. + :param str overflow: The FileStoreID of the SNAP OverflowTable index. + :param str ref: The reference FASTA FileStoreID. + :param str read2: The (optional) FileStoreID of the first-of-pair file. + :param boolean sort: If true, sorts the reads. Defaults to false. If enabled, + this function will also return the BAM Index. + :param boolean mark_duplicates: If true, marks reads as duplicates. Defaults + to false. This option is only valid if sort is True. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + file_ids = [read1] + file_ids.extend( + file_names = ['read1.fq', + 'Genome', + 'GenomeIndex', + 'GenomeIndexHash', + 'OverflowTable'] + + if read2: + file_ids.append(read2) + file_names.append('read2.fq') + + parameters = ['paired' + '/data/', + '/data/read1.fq', + '/data/read2.fq', + '-o', '-bam', '/data/sample.bam', + '-t', str(job.cores)] + else: + + parameters = ['single' + '/data/', + '/data/read1.fq', + '-o', '-bam', '/data/sample.bam', + '-t', str(job.cores)] + + if sort: + + parameters.append('-so') + + if not mark_duplicates: + parameters.extend(['-S', 'd']) + + else: + + require(not mark_duplicates, + 'Cannot run duplicate marking if sort is not enabled.') + + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=parameters, + tool='quay.io/ucsc_cgl/snap') + end_time = time.time() + log_runtime(job, start_time, end_time, 'snap (sort={}, dm={})'.format(sort, + mark_duplicates)) + + bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam')) + if not sort: + if benchmarking: + return (bam_id, (end_time - start_time)) + else: + return bam_id + else: + bai_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam.bai')) + if benchmarking: + return (bam_id, bai_id, (end_time - start_time)) + else: + return (bam_id, bai_id) + diff --git a/src/toil_lib/tools/indexing.py b/src/toil_lib/tools/indexing.py index 540d31d..2cee05a 100644 --- a/src/toil_lib/tools/indexing.py +++ b/src/toil_lib/tools/indexing.py @@ -12,7 +12,6 @@ def run_bwa_index(job, ref_id): :return: FileStoreIDs for BWA index files :rtype: tuple(str, str, str, str, str) """ - job.fileStore.logToMaster('Created BWA index files') work_dir = job.fileStore.getLocalTempDir() job.fileStore.readGlobalFile(ref_id, os.path.join(work_dir, 'ref.fa')) command = ['index', '/data/ref.fa'] @@ -21,7 +20,8 @@ def run_bwa_index(job, ref_id): ids = {} for output in ['ref.fa.amb', 'ref.fa.ann', 'ref.fa.bwt', 'ref.fa.pac', 'ref.fa.sa']: ids[output.split('.')[-1]] = (job.fileStore.writeGlobalFile(os.path.join(work_dir, output))) - return ids['amb'], ids['ann'], ids['bwt'], ids['pac'], ids['sa'] + job.fileStore.logToMaster('Created BWA index files') + return ids def run_samtools_faidx(job, ref_id): @@ -40,3 +40,45 @@ def run_samtools_faidx(job, ref_id): dockerCall(job=job, workDir=work_dir, parameters=command, tool='quay.io/ucsc_cgl/samtools:0.1.19--dd5ac549b95eb3e5d166a5e310417ef13651994e') return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'ref.fasta.fai')) + + +def run_bowtie2_index(job, + ref_id): + ''' + ''' + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(ref_id, os.path.join(work_dir, 'ref.fa')) + command = ['/data/ref.fa', '/data/ref'] + docker_parameters = ['--rm', + '--log-driver', 'none', + '-v', '{}:/data'.format(work_dir), + '--entrypoint=/opt/bowtie2/bowtie2-2.3.2/bowtie2-build'] + dockerCall(job=job, + workDir=work_dir, + parameters=command, + dockerParameters=docker_parameters, + tool='quay.io/ucsc_cgl/bowtie2') + ids = {} + for output in ['ref.1.bt2', 'ref.2.bt2', 'ref.3.bt2', 'ref.4.bt2', + 'ref.rev.1.bt2', 'ref.rev.2.bt2']: + ids[output] = (job.fileStore.writeGlobalFile(os.path.join(work_dir, output))) + job.fileStore.logToMaster('Created bowtie2 index') + return ids + + +def run_snap_index(job, + ref_id): + ''' + ''' + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(ref_id, os.path.join(work_dir, 'ref.fa')) + command = ['index', '/data/ref.fa', '/data/'] + dockerCall(job=job, + workDir=work_dir, + parameters=command, + tool='quay.io/ucsc_cgl/snap') + ids = {} + for output in ['Genome', 'GenomeIndex', 'GenomeIndexHash', 'OverflowTable']: + ids[output] = (job.fileStore.writeGlobalFile(os.path.join(work_dir, output))) + job.fileStore.logToMaster('Created SNAP index') + return ids diff --git a/src/toil_lib/tools/preprocessing.py b/src/toil_lib/tools/preprocessing.py index e16cd69..7528e5e 100644 --- a/src/toil_lib/tools/preprocessing.py +++ b/src/toil_lib/tools/preprocessing.py @@ -5,7 +5,7 @@ from toil.lib.docker import dockerCall from toil_lib import require - +from toil_lib.tools import log_runtime def run_cutadapt(job, r1_id, r2_id, fwd_3pr_adapter, rev_3pr_adapter): """ @@ -66,12 +66,14 @@ def run_samtools_faidx(job, ref_id): return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'ref.fasta.fai')) -def run_samtools_index(job, bam): +def run_samtools_index(job, bam, benchmarking=False): """ Runs SAMtools index to create a BAM index file :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID of the BAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID for BAM index file :rtype: str """ @@ -79,19 +81,28 @@ def run_samtools_index(job, bam): job.fileStore.readGlobalFile(bam, os.path.join(work_dir, 'sample.bam')) # Call: index the bam parameters = ['index', '/data/sample.bam'] + start_time = time.time() dockerCall(job=job, workDir=work_dir, parameters=parameters, tool='quay.io/ucsc_cgl/samtools:0.1.19--dd5ac549b95eb3e5d166a5e310417ef13651994e') - # Write to fileStore - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam.bai')) + end_time = time.time() + log_runtime(job, start_time, end_time, 'samtools index') + # Write to fileStore + bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.bam.bai')) + if benchmarking: + return (bai, (end_time - start_time)) + else: + return bai -def run_samtools_view(job, bam): +def run_samtools_view(job, bam, benchmarking=False): """ Runs SAMtools view to create a SAM file. :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID of the BAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID for SAM file :rtype: str """ @@ -102,29 +113,28 @@ def run_samtools_view(job, bam): '-h', '-o', '/data/sample.sam', '/data/sample.bam'] + start_time = time.time() dockerCall(job=job, workDir=work_dir, parameters=parameters, tool='quay.io/ucsc_cgl/samtools:0.1.19--dd5ac549b95eb3e5d166a5e310417ef13651994e') - # Write to fileStore - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.sam')) - - -def _log_runtime(job, start, end, cmd): - - elapsed_time = end - start - - hours = int(elapsed_time) / (60 * 60) - minutes = int(elapsed_time - (60 * 60 * hours)) / 60 - seconds = int(elapsed_time - (60 * 60 * hours) - (60 * minutes)) % 60 + end_time = time.time() + log_runtime(job, start_time, end_time, 'samtools view') - job.fileStore.logToMaster("%s ran in %dh%dm%ds" % (cmd, hours, minutes, seconds)) + # Write to fileStore + sam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.sam')) + if benchmarking: + return (sam_id, (end_time - start_time)) + else: + return sam_id -def run_samtools_sort(job, bam): +def run_samtools_sort(job, bam, benchmarking=False): """ Sorts BAM file using SAMtools sort :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID for BAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID for sorted BAM file :rtype: str """ @@ -140,16 +150,23 @@ def run_samtools_sort(job, bam): parameters=command, tool='quay.io/ucsc_cgl/samtools:1.3--256539928ea162949d8a65ca5c79a72ef557ce7c') end_time = time.time() - _log_runtime(job, start_time, end_time, "samtools sort") - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + log_runtime(job, start_time, end_time, "samtools sort") + bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + if benchmarking: + return (bam_id, (end_time - start_time)) + else: + return bam_id + -def run_samtools_rmdup(job, bam): +def run_samtools_rmdup(job, bam, benchmarking=False): """ Mark reads as PCR duplicates using SAMtools rmdup :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID for BAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID for sorted BAM file :rtype: str """ @@ -164,16 +181,25 @@ def run_samtools_rmdup(job, bam): parameters=command, tool='quay.io/ucsc_cgl/samtools:1.3--256539928ea162949d8a65ca5c79a72ef557ce7c') end_time = time.time() - _log_runtime(job, start_time, end_time, "samtools rmdup") - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + log_runtime(job, start_time, end_time, "samtools rmdup") + + bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + if benchmarking: + return (bam_id, (end_time - start_time)) + else: + return bam_id -def run_sambamba_index(job, bam, sort_by_name=False): +def run_sambamba_index(job, bam, + sort_by_name=False, + benchmarking=False): """ Indexes a BAM file using Sambamba. :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID for BAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID for sorted BAM file :rtype: str """ @@ -190,15 +216,59 @@ def run_sambamba_index(job, bam, sort_by_name=False): tool='quay.io/biocontainers/sambamba:0.6.6--0') end_time = time.time() _log_runtime(job, start_time, end_time, "sambamba index") - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'input.bam.bai')) + + bai_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'input.bam.bai')) + if benchmarking: + return (bai_id, (end_time - start_time)) + else: + bai_id + +def run_sambamba_view(job, sam, + benchmarking=False): + """ + Converts a SAM file to BAM using Sambamba. -def run_sambamba_sort(job, bam, sort_by_name=False): + :param JobFunctionWrappingJob job: passed automatically by Toil + :param str sam: FileStoreID for SAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + :return: FileStoreID for BAM file + :rtype: str + """ + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(sam, os.path.join(work_dir, 'input.sam')) + command = ['/usr/local/bin/sambamba', + 'view', + '-t', str(int(job.cores)), + '-S', '/data/input.sam', + '-f', 'bam', + '-o', '/data/input.bam'] + + start_time = time.time() + dockerCall(job=job, workDir=work_dir, + parameters=command, + tool='quay.io/biocontainers/sambamba:0.6.6--0') + end_time = time.time() + _log_runtime(job, start_time, end_time, "sambamba view") + + bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'input.bam')) + if benchmarking: + return (bam_id, (end_time - start_time)) + else: + bam_id + + +def run_sambamba_sort(job, bam, + sort_by_name=False, + benchmarking=False): """ Sorts BAM file using Sambamba sort :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID for BAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :param boolean sort_by_name: If true, sorts by read name instead of coordinate. :return: FileStoreID for sorted BAM file :rtype: str @@ -220,16 +290,23 @@ def run_sambamba_sort(job, bam, sort_by_name=False): parameters=command, tool='quay.io/biocontainers/sambamba:0.6.6--0') end_time = time.time() - _log_runtime(job, start_time, end_time, "sambamba sort") - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + log_runtime(job, start_time, end_time, "sambamba sort") + + bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + if benchmarking: + return (bam_id, (end_time - start_time)) + else: + return bam_id -def run_sambamba_markdup(job, bam): +def run_sambamba_markdup(job, bam, benchmarking=True): """ Marks reads as PCR duplicates using Sambamba :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID for BAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID for sorted BAM file :rtype: str """ @@ -246,16 +323,23 @@ def run_sambamba_markdup(job, bam): parameters=command, tool='quay.io/biocontainers/sambamba:0.6.6--0') end_time = time.time() - _log_runtime(job, start_time, end_time, "sambamba mkdup") - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + log_runtime(job, start_time, end_time, "sambamba mkdup") + + bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + if benchmarking: + return (bam_id, (end_time - start_time)) + else: + return bam_id -def run_samblaster(job, sam): +def run_samblaster(job, sam, benchmarking=False): """ Marks reads as PCR duplicates using SAMBLASTER :param JobFunctionWrappingJob job: passed automatically by Toil :param str sam: FileStoreID for SAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID for deduped SAM file :rtype: str """ @@ -271,8 +355,13 @@ def run_samblaster(job, sam): parameters=command, tool='quay.io/biocontainers/samblaster:0.1.24--0') end_time = time.time() - _log_runtime(job, start_time, end_time, "SAMBLASTER") - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.sam')) + log_runtime(job, start_time, end_time, "SAMBLASTER") + + sam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.sam')) + if benchmarking: + return (sam_id, (end_time - start_time)) + else: + return sam_id def run_picard_create_sequence_dictionary(job, ref_id): @@ -294,13 +383,17 @@ def run_picard_create_sequence_dictionary(job, ref_id): return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'ref.dict')) -def picard_mark_duplicates(job, bam, bai, validation_stringency='LENIENT'): +def picard_mark_duplicates(job, bam, bai, + validation_stringency='LENIENT', + benchmarking=False): """ Runs Picard MarkDuplicates on a BAM file. Requires that the BAM file be coordinate sorted. :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID for BAM file :param str bai: FileStoreID for BAM index file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :param str validation_stringency: BAM file validation stringency, default is LENIENT :return: FileStoreIDs for BAM and BAI files :rtype: tuple @@ -333,19 +426,27 @@ def picard_mark_duplicates(job, bam, bai, validation_stringency='LENIENT'): tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e', dockerParameters=docker_parameters) end_time = time.time() - _log_runtime(job, start_time, end_time, "Picard MarkDuplicates") + log_runtime(job, start_time, end_time, "Picard MarkDuplicates") bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'mkdups.bam')) bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'mkdups.bai')) - return bam, bai + + if benchmarking: + return (bam, bai, (end_time - start_time)) + else: + return (bam, bai) -def run_picard_sort(job, bam, sort_by_name=False): +def run_picard_sort(job, bam, + sort_by_name=False, + benchmarking=False): """ Sorts BAM file using Picard SortSam :param JobFunctionWrappingJob job: passed automatically by Toil :param str bam: FileStoreID for BAM file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :param boolean sort_by_name: If true, sorts by read name instead of coordinate. :return: FileStoreID for sorted BAM file :rtype: str @@ -374,8 +475,13 @@ def run_picard_sort(job, bam, sort_by_name=False): tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e', dockerParameters=docker_parameters) end_time = time.time() - _log_runtime(job, start_time, end_time, "Picard SortSam") - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + log_runtime(job, start_time, end_time, "Picard SortSam") + + bam_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) + if benchmarking: + return (bam_id, (end_time - start_time)) + else: + return bam_id def run_gatk_preprocessing(job, bam, bai, ref, ref_dict, fai, g1k, mills, dbsnp, realign=False, unsafe=False): @@ -518,7 +624,9 @@ def run_gatk_preprocessing(job, bam, bai, ref, ref_dict, fai, g1k, mills, dbsnp, return recalibrate_reads.rv(0), recalibrate_reads.rv(1) -def run_realigner_target_creator(job, bam, bai, ref, ref_dict, fai, g1k, mills, unsafe=False): +def run_realigner_target_creator(job, bam, bai, ref, ref_dict, fai, g1k, mills, + unsafe=False, + benchmarking=False): """ Creates intervals file needed for INDEL realignment @@ -530,6 +638,8 @@ def run_realigner_target_creator(job, bam, bai, ref, ref_dict, fai, g1k, mills, :param str fai: FileStoreID for reference fasta index file :param str g1k: FileStoreID for 1000 Genomes VCF file :param str mills: FileStoreID for Mills VCF file + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :param bool unsafe: If True, runs GATK in UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" :return: FileStoreID for realignment intervals file :rtype: str @@ -558,7 +668,7 @@ def run_realigner_target_creator(job, bam, bai, ref, ref_dict, fai, g1k, mills, '-o', '/data/sample.intervals'] end_time = time.time() - _log_runtime(job, start_time, end_time, "GATK3 RealignerTargetCreator") + log_runtime(job, start_time, end_time, "GATK3 RealignerTargetCreator") if unsafe: parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) @@ -575,13 +685,19 @@ def run_realigner_target_creator(job, bam, bai, ref, ref_dict, fai, g1k, mills, parameters=parameters, dockerParameters=docker_parameters) end_time = time.time() - _log_runtime(job, start_time, end_time, "GATK3 RTC") + log_runtime(job, start_time, end_time, "GATK3 RTC") # Write to fileStore - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.intervals')) + targets_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.intervals')) + if benchmarking: + return (targets_id, (end_time - start_time)) + else: + return targets_id -def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, g1k, mills, unsafe=False): +def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, g1k, mills, + unsafe=False, + benchmarking=False): """ Realigns BAM file at realignment target intervals @@ -595,6 +711,8 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, g1k, mil :param str g1k: FileStoreID for 1000 Genomes VCF file :param str mills: FileStoreID for Mills VCF file :param bool unsafe: If True, runs GATK in UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreIDs for realigned BAM and BAI files :rtype: tuple(str, str) """ @@ -639,14 +757,20 @@ def run_indel_realignment(job, intervals, bam, bai, ref, ref_dict, fai, g1k, mil parameters=parameters, dockerParameters=docker_parameters) end_time = time.time() - _log_runtime(job, start_time, end_time, "GATK3 IndelRealigner") + log_runtime(job, start_time, end_time, "GATK3 IndelRealigner") indel_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bam')) indel_bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.bai')) - return indel_bam, indel_bai + + if benchmarking: + return (indel_bam, indel_bai, (end_time - start_time)) + else: + return (indel_bam, indel_bai) -def run_base_recalibration(job, bam, bai, ref, ref_dict, fai, dbsnp, mills, unsafe=False): +def run_base_recalibration(job, bam, bai, ref, ref_dict, fai, dbsnp, mills, + unsafe=False, + benchmarking=False): """ Creates recalibration table for Base Quality Score Recalibration @@ -659,6 +783,8 @@ def run_base_recalibration(job, bam, bai, ref, ref_dict, fai, dbsnp, mills, unsa :param str dbsnp: FileStoreID for dbSNP VCF file :param str mills: FileStoreID for Mills VCF file :param bool unsafe: If True, runs GATK in UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreID for the recalibration table file :rtype: str """ @@ -699,9 +825,13 @@ def run_base_recalibration(job, bam, bai, ref, ref_dict, fai, dbsnp, mills, unsa parameters=parameters, dockerParameters=docker_parameters) end_time = time.time() - _log_runtime(job, start_time, end_time, "GATK3 BaseRecalibrator") + log_runtime(job, start_time, end_time, "GATK3 BaseRecalibrator") - return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'recal_data.table')) + table_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'recal_data.table')) + if benchmarking: + return (table_id, (end_time - start_time)) + else: + return table_id def apply_bqsr_recalibration(job, table, bam, bai, ref, ref_dict, fai, unsafe=False): @@ -716,6 +846,8 @@ def apply_bqsr_recalibration(job, table, bam, bai, ref, ref_dict, fai, unsafe=Fa :param str ref_dict: FileStoreID for reference genome sequence dictionary file :param str fai: FileStoreID for reference genome fasta index file :param bool unsafe: If True, runs GATK in UNSAFE mode: "-U ALLOW_SEQ_DICT_INCOMPATIBILITY" + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. :return: FileStoreIDs for recalibrated BAM and BAI files :rtype: tuple(str, str) """ @@ -738,7 +870,7 @@ def apply_bqsr_recalibration(job, table, bam, bai, ref, ref_dict, fai, unsafe=Fa '-BQSR', '/data/recal.table', '-o', '/data/bqsr.bam'] end_time = time.time() - _log_runtime(job, start_time, end_time, "GATK3 BQSR PrintReads") + log_runtime(job, start_time, end_time, "GATK3 BQSR PrintReads") if unsafe: parameters.extend(['-U', 'ALLOW_SEQ_DICT_INCOMPATIBILITY']) @@ -754,8 +886,12 @@ def apply_bqsr_recalibration(job, table, bam, bai, ref, ref_dict, fai, unsafe=Fa parameters=parameters, dockerParameters=docker_parameters) end_time = time.time() - _log_runtime(job, start_time, end_time, "GATK3 BQSR PrintReads") + log_runtime(job, start_time, end_time, "GATK3 BQSR PrintReads") output_bam = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'bqsr.bam')) output_bai = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'bqsr.bai')) - return output_bam, output_bai + + if benchmarking: + return (output_bam, output_bai, (end_time - start_time)) + else: + return (output_bam, output_bai) diff --git a/src/toil_lib/tools/spark_tools.py b/src/toil_lib/tools/spark_tools.py index 4ff4e5c..c773c48 100644 --- a/src/toil_lib/tools/spark_tools.py +++ b/src/toil_lib/tools/spark_tools.py @@ -8,10 +8,12 @@ import os.path from subprocess import check_call +import time from toil.lib.docker import dockerCall from toil_lib import require +from toil_lib.tools import log_runtime SPARK_MASTER_PORT = "7077" HDFS_MASTER_PORT = "8020" @@ -95,7 +97,12 @@ def _make_parameters(master_ip, default_parameters, memory, arguments, override_ return parameters -def call_conductor(job, master_ip, src, dst, memory=None, override_parameters=None): +def call_conductor(job, + master_ip, + src, + dst, + container="quay.io/ucsc_cgl/conductor", + memory=None, override_parameters=None): """ Invokes the Conductor container to copy files between S3 and HDFS and vice versa. Find Conductor at https://github.com/BD2KGenomics/conductor. @@ -103,13 +110,15 @@ def call_conductor(job, master_ip, src, dst, memory=None, override_parameters=No :param toil.Job.job job: The Toil Job calling this function :param masterIP: The Spark leader IP address. :param src: URL of file to copy. - :param src: URL of location to copy file to. + :param dst: URL of location to copy file to. + :param container: The container name to run. :param memory: Gigabytes of memory to provision for Spark driver/worker. :param override_parameters: Parameters passed by the user, that override our defaults. :type masterIP: MasterAddress :type src: string :type dst: string + :type container: string :type memory: int or None :type override_parameters: list of string or None """ @@ -118,7 +127,7 @@ def call_conductor(job, master_ip, src, dst, memory=None, override_parameters=No docker_parameters = ['--log-driver', 'none', master_ip.docker_parameters(["--net=host"])] dockerCall(job=job, - tool="quay.io/ucsc_cgl/conductor", + tool=container, parameters=_make_parameters(master_ip, [], # no conductor specific spark configuration memory, @@ -128,16 +137,19 @@ def call_conductor(job, master_ip, src, dst, memory=None, override_parameters=No def call_adam(job, master_ip, arguments, + container="quay.io/ucsc_cgl/adam:0.22.0--7add8b306862902b2bdd28a991e4e8dbc5292504", memory=None, override_parameters=None, run_local=False, - native_adam_path=None): + native_adam_path=None, + benchmarking=False): """ Invokes the ADAM container. Find ADAM at https://github.com/bigdatagenomics/adam. :param toil.Job.job job: The Toil Job calling this function :param masterIP: The Spark leader IP address. :param arguments: Arguments to pass to ADAM. + :param container: The container name to run. :param memory: Gigabytes of memory to provision for Spark driver/worker. :param override_parameters: Parameters passed by the user, that override our defaults. :param native_adam_path: Path to ADAM executable. If not provided, Docker is used. @@ -146,6 +158,7 @@ def call_adam(job, master_ip, arguments, :type masterIP: MasterAddress :type arguments: list of string + :type container: string :type memory: int or None :type override_parameters: list of string or None :type native_adam_path: string or None @@ -178,16 +191,127 @@ def call_adam(job, master_ip, arguments, # are we running adam via docker, or do we have a native path? if native_adam_path is None: docker_parameters = ['--log-driver', 'none', master_ip.docker_parameters(["--net=host"])] + start_time = time.time() dockerCall(job=job, - tool="quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80", - dockerParameters=docker_parameters, - parameters=_make_parameters(master_ip, - default_params, - memory, - arguments, - override_parameters)) + tool=container, + dockerParameters=docker_parameters, + parameters=_make_parameters(master_ip, + default_params, + memory, + arguments, + override_parameters)) + end_time = time.time() + log_runtime(job, start_time, end_time, 'adam') + + if benchmarking: + return (end_time - start_time) + else: check_call([os.path.join(native_adam_path, "bin/adam-submit")] + default_params + arguments) + +def call_avocado(job, master_ip, arguments, + container="quay.io/ucsc_cgl/avocado:fb20657172d2ce38e5dcd5542b0915db4de7eaa0--036b9354dbd46e62c4d326b4308c4786fc966d6a", + memory=None, + override_parameters=None, + run_local=False, + benchmarking=False): + """ + Invokes the Avocado container. Find Avocado at https://github.com/bigdatagenomics/avocado. + + :param toil.Job.job job: The Toil Job calling this function + :param masterIP: The Spark leader IP address. + :param arguments: Arguments to pass to Avocado. + :param container: The container name to run. + :param memory: Gigabytes of memory to provision for Spark driver/worker. + :param override_parameters: Parameters passed by the user, that override our defaults. + :param run_local: If true, runs Spark with the --master local[*] setting, which uses + all cores on the local machine. The master_ip will be disregarded. + + :type masterIP: MasterAddress + :type arguments: list of string + :type container: string + :type memory: int or None + :type override_parameters: list of string or None + :type run_local: boolean + """ + if run_local: + master = ["--master", "local[*]"] + else: + master = ["--master", + ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),] + + default_params = (master + [ + # set max result size to unlimited, see #177 + "--conf", "spark.driver.maxResultSize=0", + "--conf", "spark.kryoserializer.buffer.max=2047m" + ]) + + docker_parameters = ['--log-driver', 'none', master_ip.docker_parameters(["--net=host"])] + start_time = time.time() + dockerCall(job=job, + tool=container, + dockerParameters=docker_parameters, + parameters=_make_parameters(master_ip, + default_params, + memory, + arguments, + override_parameters)) + end_time = time.time() + log_runtime(job, start_time, end_time, 'adam') + + if benchmarking: + return (end_time - start_time) + + +def call_cannoli(job, master_ip, arguments, + container="quay.io/ucsc_cgl/cannoli:0a9321a382fdfad1411cb308a0de1566bf4c8bb4--036b9354dbd46e62c4d326b4308c4786fc966d6a", + memory=None, + override_parameters=None, + run_local=False + benchmarking=False): + """ + Invokes the Cannoli container. Find Cannoli at https://github.com/bigdatagenomics/cannoli. + + :param toil.Job.job job: The Toil Job calling this function + :param masterIP: The Spark leader IP address. + :param arguments: Arguments to pass to Cannoli. + :param container: The container name to run. + :param memory: Gigabytes of memory to provision for Spark driver/worker. + :param override_parameters: Parameters passed by the user, that override our defaults. + :param run_local: If true, runs Spark with the --master local[*] setting, which uses + all cores on the local machine. The master_ip will be disregarded. + + :type masterIP: MasterAddress + :type arguments: list of string + :type container: string + :type memory: int or None + :type override_parameters: list of string or None + :type run_local: boolean + """ + if run_local: + master = ["--master", "local[*]"] + else: + master = ["--master", + ("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)), + "--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),] + + docker_parameters = ['--log-driver', 'none', master_ip.docker_parameters(["--net=host"])] + start_time = time.time() + dockerCall(job=job, + tool=container, + dockerParameters=docker_parameters, + parameters=_make_parameters(master_ip, + master, + memory, + arguments, + override_parameters)) + end_time = time.time() + log_runtime(job, start_time, end_time, 'adam') + + if benchmarking: + return (end_time - start_time) + diff --git a/src/toil_lib/tools/variant_callers.py b/src/toil_lib/tools/variant_callers.py new file mode 100644 index 0000000..b594ace --- /dev/null +++ b/src/toil_lib/tools/variant_callers.py @@ -0,0 +1,430 @@ +import logging +import os +import time + +from toil.lib.docker import dockerCall + +from toil_lib.tools import log_runtime + +_log = logging.getLogger(__name__) + +def run_freebayes(job, ref, ref_fai, bam, bai, chunksize=None): + ''' + Calls FreeBayes to call variants. + + If job.cores > 1 and chunksize is not None, then runs FreeBayes + parallel mode. If not, then runs single threaded FreeBayes. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str ref: The reference FASTA FileStoreID. + :param str ref_fai: The reference FASTA index FileStoreID. + :param str bam: The FileStoreID of the BAM to call. + :param str bai: The FileStoreID of the BAM index. + :param int chunksize: The size of chunks to split into and call in parallel. + Defaults to None. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + file_ids = [ref, ref_fai, bam, bai] + file_names = ['ref.fa', 'ref.fa.fai', 'sample.bam', 'sample.bam.bai'] + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + output_vcf = os.path.join(work_dir, 'sample.vcf') + + if job.cores > 1 and chunksize: + _log.info('Running FreeBayes parallel with %dbp chunks and %d cores', + chunksize, + job.cores) + + docker_parameters = ['--rm', + '--log-driver', 'none', + '-v', '{}:/data'.format(work_dir), + '--entrypoint=/opt/cgl-docker-lib/freebayes/scripts/fasta_generate_regions.py'] + start_time = time.time() + dockerCall(job, + workDir=work_dir, + dockerParameters=docker_parameters, + parameters=['/data/ref.fa.fai', str(chunksize)], + tool='quay.io/ucsc_cgl/freebayes', + outfile=open(os.path.join(work_dir, 'regions'))) + end_time = time.time() + log_runtime(job, start_time, end_time, 'fasta_generate_regions') + + docker_parameters = ['--rm', + '--log-driver', 'none', + '-v', '{}:/data'.format(work_dir), + '--entrypoint=/opt/cgl-docker-lib/freebayes/scripts/freebayes-parallel'] + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + dockerParameters=docker_parameters, + parameters=['/data/regions', + str(job.cores), + '-f', '/data/ref.fa', + '/data/sample.bam'], + tool='quay.io/ucsc_cgl/freebayes', + outfile=open(output_vcf)) + end_time = time.time() + log_runtime(job, start_time, end_time, 'FreeBayes Parallel') + + else: + _log.info('Running FreeBayes single threaded.') + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['-f', '/data/ref.fa', + '/data/sample.bam'], + tool='quay.io/ucsc_cgl/freebayes', + outfile=open(output_vcf)) + end_time = time.time() + log_runtime(job, start_time, end_time, 'FreeBayes') + + return job.fileStore.writeGlobalFile(output_vcf) + + +def run_platypus(job, ref, ref_fai, bam, bai, assemble=False): + ''' + Runs Platypus to call variants. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str ref: The reference FASTA FileStoreID. + :param str ref_fai: The reference FASTA index FileStoreID. + :param str bam: The FileStoreID of the BAM to call. + :param str bai: The FileStoreID of the BAM index. + :param boolean assemble: If true, runs Platypus in assembler mode. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + file_ids = [ref, ref_fai, bam, bai] + file_names = ['ref.fa', 'ref.fa.fai', 'sample.bam', 'sample.bam.bai'] + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + parameters = ['callVariants', + '--refFile', '/data/ref.fa', + '--outputFile', '/data/sample.vcf', + '--bamFiles', '/data/sample.bam'] + + if job.cores > 1: + parameters.extend(['--nCPU', str(job.cores)]) + + if assemble: + parameters.append('--assemble') + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=parameters, + tool='quay.io/ucsc_cgl/platypus') + end_time = time.time() + log_runtime(job, start_time, end_time, 'Platypus, assemble={}'.format(assemble)) + + vcf_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.vcf')) + if benchmarking: + return (vcf_id, (end_time - start_time)) + else: + return vcf_id + + +def run_16gt(job, ref, genome_index, bam, dbsnp, sample_name, benchmarking=False): + ''' + Generates the snapshot file and calls variants using 16GT. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str ref: The reference FASTA FileStoreID. + :param str genome_index: The FileStoreID of the SOAP3-dp genome index. + :param str bam: The FileStoreID of the BAM to call. + :param str dbsnp: The FileStoreID of the dbSNP VCF for filtration. + :param str sample_name: The name of the sample being called. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + file_ids = [ref, genome_index, bam, dbsnp] + file_names = ['ref.fa', 'ref.fa.index', 'sample.bam', 'dbsnp.vcf'] + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['/opt/cgl-docker-lib/16GT/bam2snapshot', + '-i', '/data/ref.fa.index', + '-b', '/data/sample.bam', + '-o', '/data/sample'], + tool='quay.io/ucsc_cgl/16gt') + end_time = time.time() + log_runtime(job, start_time, end_time, '16gt bam2snapshot') + snapshot_time = (end_time - start_time) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['/opt/cgl-docker-lib/16GT/snapshotSnpcaller', + '-i', '/data/ref.fa.index', + '-o', '/data/sample'], + tool='quay.io/ucsc_cgl/16gt') + end_time = time.time() + log_runtime(job, start_time, end_time, '16gt snapshotSnpcaller') + snp_caller_time = (end_time - start_time) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['perl', '/opt/cgl-docker-lib/16GT/txt2vcf.pl', + '/data/sample', + sample_name, + '/data/ref.fa'], + tool='quay.io/ucsc_cgl/16gt', + outfile=open(os.path.join(work_dir, 'sample.vcf'))) + end_time = time.time() + log_runtime(job, start_time, end_time, '16gt txt2vcf') + text_vcf_time = (end_time - start_time) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['perl', '/opt/cgl-docker-lib/16GT/filterVCF.pl', + '/data/sample.vcf', + '/data/dbsnp.vcf'], + tool='quay.io/ucsc_cgl/16gt', + outfile=open(os.path.join(work_dir, 'sample.filtered.vcf'))) + end_time = time.time() + log_runtime(job, start_time, end_time, '16gt filterVCF') + filter_vcf_time = (end_time - start_time) + + vcf_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.filtered.vcf')) + if benchmarking: + return (vcf_id, snapshot_time, snp_caller_time, text_vcf_time, filter_vcf_time) + else: + return vcf_id + + +def run_strelka(job, ref, ref_fai, bam, bai, + candidate_indels=None, + benchmarking=False): + ''' + Runs Strelka's germline single sample caller. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str ref: The reference FASTA FileStoreID. + :param str ref_fai: The reference FASTA index FileStoreID. + :param str bam: The FileStoreID of the BAM to call. + :param str bai: The FileStoreID of the BAM index. + :param str candidate_indels: The optional FileStoreID of the candidate + indel GZIPed VCF. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + generate_parameters = ['/opt/strelka/bin/configureStrelkaGermlineWorkflow.py', + '--bam', '/data/sample.bam', + '--referenceFasta', '/data/ref.fa', + '--runDir', '/data/'] + work_dir = job.fileStore.getLocalTempDir() + file_ids = [ref, ref_fai, bam, bai] + file_names = ['ref.fa', 'ref.fa.fai', 'sample.bam', 'sample.bam.bai'] + + if candidate_indels: + _log.info('Candidate indels from Manta were provided for Strelka.') + file_ids.append(candidate_indels) + file_names.append('candidateSmallIndels.vcf.gz') + generate_parameters.extend(['--indelCandidates', '/data/candidateSmallIndels.vcf.gz']) + else: + _log.info('No candidate indels provided.') + + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=generate_parameters, + tool='quay.io/ucsc_cgl/strelka') + end_time = time.time() + log_runtime(job, start_time, end_time, 'Configuring Strelka') + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['/data/runWorkflow.py', + '-m', 'local', + '-j', str(job.cores)], + tool='quay.io/ucsc_cgl/strelka') + end_time = time.time() + log_runtime(job, start_time, end_time, 'Strelka') + + vcf_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'variants.vcf.gz')) + if benchmarking: + return (vcf_id, (end_time - start_time)) + else: + return vcf_id + + +def run_manta(job, ref, ref_fai, bam, bai, benchmarking=False): + ''' + Runs Manta's germline single sample caller. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str ref: The reference FASTA FileStoreID. + :param str ref_fai: The reference FASTA index FileStoreID. + :param str bam: The FileStoreID of the BAM to call. + :param str bai: The FileStoreID of the BAM index. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + file_ids = [ref, ref_fai, bam, bai] + file_names = ['ref.fa', 'ref.fa.fai', 'sample.bam', 'sample.bam.bai'] + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['/opt/manta/bin/configManta.py', + '--normalBam', '/data/sample.bam', + '--referenceFasta', '/data/ref.fa', + '--runDir', '/data/'], + tool='quay.io/ucsc_cgl/manta') + end_time = time.time() + log_runtime(job, start_time, end_time, 'Configuring Manta') + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['/data/runWorkflow.py', + '-m', 'local', + '-j', str(job.cores)], + tool='quay.io/ucsc_cgl/manta') + end_time = time.time() + log_runtime(job, start_time, end_time, 'Manta') + + sv_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'diploidSV.vcf.gz')) + indel_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, + 'candidateSmallIndels.vcf.gz')) + if benchmarking: + return (sv_id, indel_id, (end_time - start_time)) + else: + return (sv_id, indel_id) + + +def run_samtools_mpileup(job, ref, ref_fai, bam, bai, benchmarking=False): + ''' + Runs the samtools mpileup variant caller. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str ref: The reference FASTA FileStoreID. + :param str ref_fai: The reference FASTA index FileStoreID. + :param str bam: The FileStoreID of the BAM to call. + :param str bai: The FileStoreID of the BAM index. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + file_ids = [ref, ref_fai, bam, bai] + file_names = ['ref.fa', 'ref.fa.fai', 'sample.bam', 'sample.bam.bai'] + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['mpileup', + '-f', '/data/ref.fa', + '-o', '/data/sample.vcf.gz', + '/data/sample.bam'], + tool='quay.io/ucsc_cgl/samtools') + end_time = time.time() + log_runtime(job, start_time, end_time, 'samtools mpileup') + + vcf_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.vcf.gz')) + if benchmarking: + return (vcf_id, (end_time - start_time)) + else: + return vcf_id + + +def run_bcftools_call(job, vcf_gz, benchmarking=False): + ''' + Runs the bcftools call command. + + :param JobFunctionWrappingJob job: passed automatically by Toil. + :param str vcf_gz: The FileStoreID of the GZIPed VCF. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + job.fileStore.readGlobalFile(vcf_gz, os.path.join(work_dir, 'sample.vcf.gz')) + + start_time = time.time() + dockerCall(job=job, + workDir=work_dir, + parameters=['call', + '-o', '/data/sample.calls.vcf.gz', + '--threads', str(job.cores), + '/data/sample.vcf.gz'], + tool='quay.io/ucsc_cgl/bcftools') + end_time = time.time() + log_runtime(job, start_time, end_time, 'bcftools call') + + vcf_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.calls.vcf.gz')) + if benchmarking: + return (vcf_id, (end_time - start_time)) + else: + return vcf_id + + +def run_gatk3_haplotype_caller(job, ref, ref_fai, ref_dict, bam, bai, + emit_threshold=10.0, + call_threshold=30.0): + ''' + Runs the GATK3's HaplotypeCaller. + + :param str ref: The reference FASTA FileStoreID. + :param str ref_fai: The reference FASTA index FileStoreID. + :param str ref_dict: The reference sequence dictionary FileStoreID. + :param str bam: The FileStoreID of the BAM to call. + :param str bai: The FileStoreID of the BAM index. + :param boolean benchmarking: If true, returns the runtime along with the + FileStoreID. + ''' + work_dir = job.fileStore.getLocalTempDir() + file_ids = [ref, ref_fai, ref_dict, bam, bai] + file_names = ['ref.fa', 'ref.fa.fai', 'ref.dict', 'sample.bam', 'sample.bam.bai'] + for file_store_id, name in zip(file_ids, file_names): + job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name)) + + command = ['-T', 'HaplotypeCaller', + '-nct', str(job.cores), + '-R', 'genome.fa', + '-I', 'input.bam', + '-o', 'output.g.vcf', + '-stand_call_conf', str(call_threshold), + '-stand_emit_conf', str(emit_threshold), + '-variant_index_type', 'LINEAR', + '-variant_index_parameter', '128000', + '--genotyping_mode', 'Discovery', + '--emitRefConfidence', 'GVCF'] + + # Set TMPDIR to /data to prevent writing temporary files to /tmp + docker_parameters = ['--rm', + '--log-driver', 'none', + '-e', 'JAVA_OPTS=-Djava.io.tmpdir=/data/ -Xmx{}'.format(job.memory), + '-v', '{}:/data'.format(work_dir)] + start_time = time.time() + dockerCall(job=job, tool='quay.io/ucsc_cgl/gatk:3.5--dba6dae49156168a909c43330350c6161dc7ecc2', + workDir=work_dir, + parameters=parameters, + dockerParameters=docker_parameters) + end_time = time.time() + log_runtime(job, start_time, end_time, "GATK3 HaplotypeCaller") + + vcf_id = job.fileStore.writeGlobalFile(os.path.join(work_dir, 'output.g.vcf')) + if benchmarking: + return (vcf_id, (end_time - start_time)) + else: + return vcf_id