diff --git a/pipeline_versions.txt b/pipeline_versions.txt index c0d8e61068..e9ccdc4382 100644 --- a/pipeline_versions.txt +++ b/pipeline_versions.txt @@ -4,7 +4,7 @@ BuildIndices 5.1.0 2026-02-13 CramToUnmappedBams 1.1.3 2024-08-02 ExomeGermlineSingleSample 3.2.7 2026-01-21 ExomeReprocessing 3.3.7 2026-01-21 -Glimpse2LowPassImputation 0.0.2 2026-03-19 +Glimpse2LowPassImputation 0.0.3 2026-03-25 IlluminaGenotypingArray 1.12.27 2026-01-21 Imputation 1.1.23 2025-10-03 ImputationBeagle 3.0.1 2026-02-23 diff --git a/pipelines/wdl/glimpse/low_pass_imputation/Glimpse2LowPassImputation.changelog.md b/pipelines/wdl/glimpse/low_pass_imputation/Glimpse2LowPassImputation.changelog.md index 414b86c857..803cdc7954 100644 --- a/pipelines/wdl/glimpse/low_pass_imputation/Glimpse2LowPassImputation.changelog.md +++ b/pipelines/wdl/glimpse/low_pass_imputation/Glimpse2LowPassImputation.changelog.md @@ -1,3 +1,9 @@ +# 0.0.3 +2026-03-25 (Date of Last Commit) + +* Reorganize wdl to be able to run on contigs more easily. Now the workflow is fully driven by the `contigs` input +* The wdl now expects the reference related files to all live under the same cloud base path + # 0.0.2 2026-03-19 (Date of Last Commit) diff --git a/pipelines/wdl/glimpse/low_pass_imputation/Glimpse2LowPassImputation.wdl b/pipelines/wdl/glimpse/low_pass_imputation/Glimpse2LowPassImputation.wdl index 97b275d291..5e99b82089 100644 --- a/pipelines/wdl/glimpse/low_pass_imputation/Glimpse2LowPassImputation.wdl +++ b/pipelines/wdl/glimpse/low_pass_imputation/Glimpse2LowPassImputation.wdl @@ -2,16 +2,13 @@ version 1.0 workflow Glimpse2LowPassImputation { input { - String pipeline_version = "0.0.2" - - # List of files, one per line - File reference_chunks - File sites_vcf - File sites_table - File sites_table_index + String pipeline_version = "0.0.3" Array[String] contigs + # this is the path the a directory that contains sites vcf, sites tabke, and reference chunks file. should end with a "/ + String reference_panel_prefix + File? input_vcf File? input_vcf_index Array[File]? crams @@ -25,15 +22,12 @@ workflow Glimpse2LowPassImputation { Boolean impute_reference_only_variants = false Boolean call_indels = false - Int? n_burnin - Int? n_main - Int? effective_population_size # batch size used when calling SplitIntoBatches to make variant calls from the crams Int calling_batch_size = 100 - String docker = "us.gcr.io/broad-dsde-methods/glimpse:kachulis_ck_bam_reader_retry_cf5822c" - String docker_extract_num_sites_from_reference_chunk = "us.gcr.io/broad-dsde-methods/glimpse_extract_num_sites_from_reference_chunks:michaelgatzen_edc7f3a" + String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.0.0" + String glimpse_docker = "us.gcr.io/broad-dsde-methods/glimpse:kachulis_ck_bam_reader_retry_cf5822c" } if (defined(input_vcf)) { @@ -55,104 +49,127 @@ workflow Glimpse2LowPassImputation { sample_ids = sample_ids } } - Array[Array[String]] crams_batches = select_first([SplitIntoBatches.crams_batches, [select_first([crams])]]) - Array[Array[String]] cram_indices_batches = select_first([SplitIntoBatches.cram_indices_batches, [select_first([cram_indices])]]) - Array[Array[String]] sample_ids_batches = select_first([SplitIntoBatches.sample_ids_batches, [select_first([sample_ids])]]) + } - scatter(i in range(length(crams_batches))) { - call BcftoolsMpileup { - input: - crams = crams_batches[i], - cram_indices = cram_indices_batches[i], - sample_ids = sample_ids_batches[i], - fasta = fasta, - fasta_index = fasta_index, - call_indels = call_indels, - sites_vcf = sites_vcf, + scatter(contig in contigs) { + File sites_vcf = reference_panel_prefix + "sites." + contig + ".vcf.gz" + File sites_vcf_index =reference_panel_prefix + "sites." + contig + ".vcf.gz.tbi" + File sites_table = reference_panel_prefix + "sites_table." + contig + ".gz" + File sites_table_index = reference_panel_prefix + "sites_table." + contig + ".gz.tbi" + File reference_chunks = reference_panel_prefix + "reference_chunks." + contig + ".txt" + + if (defined(crams)) { + Array[Array[String]] crams_batches = select_first([SplitIntoBatches.crams_batches, [select_first([crams])]]) + Array[Array[String]] cram_indices_batches = select_first([SplitIntoBatches.cram_indices_batches, [select_first([cram_indices])]]) + Array[Array[String]] sample_ids_batches = select_first([SplitIntoBatches.sample_ids_batches, [select_first([sample_ids])]]) + + scatter(i in range(length(crams_batches))) { + call BcftoolsMpileup { + input: + crams = crams_batches[i], + cram_indices = cram_indices_batches[i], + sample_ids = sample_ids_batches[i], + fasta = fasta, + fasta_index = fasta_index, + call_indels = call_indels, + sites_vcf = sites_vcf, + } + + call BcftoolsCall { + input: + mpileup_bcf = BcftoolsMpileup.output_bcf, + sites_table = sites_table, + sites_table_index = sites_table_index, + } + + call BcftoolsNorm { + input: + calls_bcf = BcftoolsCall.output_bcf, + } } - call BcftoolsCall { - input: - mpileup_bcf = BcftoolsMpileup.output_bcf, - sites_table = sites_table, - sites_table_index = sites_table_index, + if (length(BcftoolsNorm.output_vcf) > 1) { + call BcftoolsMerge { + input: + vcfs = BcftoolsNorm.output_vcf, + vcf_indices = BcftoolsNorm.output_vcf_index, + output_basename = output_basename + } } - call BcftoolsNorm { - input: - calls_bcf = BcftoolsCall.output_bcf, - } + File phase_input_vcf = select_first([BcftoolsMerge.merged_vcf, BcftoolsNorm.output_vcf[0], input_vcf]) + File phase_input_vcf_index = select_first([BcftoolsMerge.merged_vcf_index, BcftoolsNorm.output_vcf_index[0], input_vcf_index]) } - if (length(BcftoolsNorm.output_vcf) > 1) { - call BcftoolsMerge { + ## this task is used to grab the reference chunk but does not affect memory usage of glimpsePhase. + ## still tbd which method makes the most sense cost wise + call ComputeShardsAndMemoryPerShard { + input: + reference_chunks_memory = reference_chunks, + n_samples = n_samples + } + + scatter (reference_chunk_index in range(length(ComputeShardsAndMemoryPerShard.reference_chunk_file_paths))) { + + call GlimpsePhase { input: - vcfs = BcftoolsNorm.output_vcf, - vcf_indices = BcftoolsNorm.output_vcf_index, - output_basename = output_basename + reference_chunk = ComputeShardsAndMemoryPerShard.reference_chunk_file_paths[reference_chunk_index], + input_vcf = phase_input_vcf, + input_vcf_index = phase_input_vcf_index, + impute_reference_only_variants = impute_reference_only_variants, + call_indels = call_indels, + sample_ids = sample_ids, + fasta = fasta, + fasta_index = fasta_index, + docker = glimpse_docker } } - File merged_vcf = select_first([BcftoolsMerge.merged_vcf, BcftoolsNorm.output_vcf[0]]) - File merged_vcf_index = select_first([BcftoolsMerge.merged_vcf_index, BcftoolsNorm.output_vcf_index[0]]) + call GlimpseLigate { + input: + imputed_chunks = GlimpsePhase.imputed_vcf, + imputed_chunks_indices = GlimpsePhase.imputed_vcf_index, + output_basename = output_basename, + ref_dict = ref_dict, + docker = glimpse_docker + } + Array[File] contig_coverage_metrics = select_all(GlimpsePhase.coverage_metrics) } - ## this task is used to grab the reference chunk but does not affect memory usage of glimpsePhase. - ## still tbd which method makes the most sense cost wise - call ComputeShardsAndMemoryPerShard { + call GatherVcfsNoIndex { input: - reference_chunks_memory = reference_chunks, - contigs = contigs, - n_samples = n_samples + input_vcfs = GlimpseLigate.imputed_vcf, + output_vcf_basename = output_basename + ".imputed", + gatk_docker = gatk_docker } - scatter (reference_chunk_index in range(length(ComputeShardsAndMemoryPerShard.reference_chunk_file_paths))) { - - call GlimpsePhase { - input: - reference_chunk = ComputeShardsAndMemoryPerShard.reference_chunk_file_paths[reference_chunk_index], - input_vcf = select_first([merged_vcf,input_vcf]), - input_vcf_index = select_first([merged_vcf_index,input_vcf_index]), - impute_reference_only_variants = impute_reference_only_variants, - n_burnin = n_burnin, - n_main = n_main, - effective_population_size = effective_population_size, - call_indels = call_indels, - sample_ids = sample_ids, - fasta = fasta, - fasta_index = fasta_index, - docker = docker - } - } - - call GlimpseLigate { + call CreateVcfIndexAndMd5 { input: - imputed_chunks = GlimpsePhase.imputed_vcf, - imputed_chunks_indices = GlimpsePhase.imputed_vcf_index, - output_basename = output_basename, - ref_dict = ref_dict, - docker = docker + vcf_input = GatherVcfsNoIndex.output_vcf, + gatk_docker = gatk_docker, + preemptible = 0 } - if (length(select_all(GlimpsePhase.coverage_metrics)) > 0) { + Array[File] genome_coverage_metrics = flatten(contig_coverage_metrics) + if (length(genome_coverage_metrics) > 0) { call CombineCoverageMetrics { input: - cov_metrics = select_all(GlimpsePhase.coverage_metrics), + cov_metrics = genome_coverage_metrics, output_basename = output_basename } } call CollectQCMetrics { input: - imputed_vcf = GlimpseLigate.imputed_vcf, + imputed_vcf = GatherVcfsNoIndex.output_vcf, output_basename = output_basename } output { - File imputed_vcf = GlimpseLigate.imputed_vcf - File imputed_vcf_index = GlimpseLigate.imputed_vcf_index - File imputed_vcf_md5sum = GlimpseLigate.imputed_vcf_md5sum + File imputed_vcf = CreateVcfIndexAndMd5.output_vcf + File imputed_vcf_index = CreateVcfIndexAndMd5.output_vcf_index + File imputed_vcf_md5sum = CreateVcfIndexAndMd5.output_vcf_md5sum File qc_metrics = CollectQCMetrics.qc_metrics File? coverage_metrics = CombineCoverageMetrics.coverage_metrics @@ -209,7 +226,6 @@ task SplitIntoBatches { task ComputeShardsAndMemoryPerShard { input { File reference_chunks_memory - Array[String] contigs Int n_samples } @@ -221,17 +237,13 @@ task ComputeShardsAndMemoryPerShard { df = pd.read_csv('~{reference_chunks_memory}', sep='\t', header=None, names=['contig', 'reference_shard', 'base_gb', 'slope_per_sample_gb']) - # filter dataframe by contig list - chromosomes_to_filter = ["~{sep='", "' contigs}"] - filtered_df = df[df['contig'].isin(chromosomes_to_filter)] - # write out reference shards to process - filtered_df['reference_shard'].to_csv('reference_shard_file_paths.tsv', sep='\t', index=False, header=None) + df['reference_shard'].to_csv('reference_shard_file_paths.tsv', sep='\t', index=False, header=None) # calculate memory usage and save to file - filtered_df['mem_gb'] = filtered_df['base_gb'] + filtered_df['slope_per_sample_gb'] * ~{n_samples} - filtered_df['mem_gb'] = filtered_df['mem_gb'].apply(lambda x: min(256, int(np.ceil(x)))) # cap at 256 GB - filtered_df['mem_gb'].to_csv('memory_per_chunk.tsv', sep='\t', index=False, header=None) + df['mem_gb'] = df['base_gb'] + df['slope_per_sample_gb'] * ~{n_samples} + df['mem_gb'] = df['mem_gb'].apply(lambda x: min(256, int(np.ceil(x)))) # cap at 256 GB + df['mem_gb'].to_csv('memory_per_chunk.tsv', sep='\t', index=False, header=None) EOF >>> @@ -257,15 +269,14 @@ task BcftoolsMpileup { File sites_vcf Int seed = 12345 - Int mem_gb = 4 + Int mem_gb = 6 Int cpu = 1 Int preemptible = 0 + Int max_retries = 3 } Int disk_size_gb = ceil(1.5*size(crams, "GiB") + size(fasta, "GiB") + size(sites_vcf, "GiB")) + 10 - String out_basename = "batch" - command <<< set -xeuo pipefail @@ -285,6 +296,7 @@ task BcftoolsMpileup { memory: mem_gb + " GiB" cpu: cpu preemptible: preemptible + maxRetries: max_retries } output { @@ -299,15 +311,14 @@ task BcftoolsCall { File sites_table File sites_table_index - Int mem_gb = 4 + Int mem_gb = 12 Int cpu = 1 Int preemptible = 3 + Int max_retries = 3 } Int disk_size_gb = ceil(3*size(mpileup_bcf, "GiB") + size(sites_table, "GiB")) + 10 - String out_basename = "batch" - command <<< set -xeuo pipefail @@ -320,6 +331,7 @@ task BcftoolsCall { memory: mem_gb + " GiB" cpu: cpu preemptible: preemptible + maxRetries: max_retries } output { @@ -331,21 +343,20 @@ task BcftoolsNorm { input { File calls_bcf - Int mem_gb = 4 + Int mem_gb = 6 Int cpu = 1 Int preemptible = 3 + Int max_retries = 3 } Int disk_size_gb = ceil(3*size(calls_bcf, "GiB")) + 10 - String out_basename = "batch" - command <<< set -xeuo pipefail - bcftools norm -m -both -Oz -o ~{out_basename}.vcf.gz ~{calls_bcf} - bcftools index -t ~{out_basename}.vcf.gz + bcftools norm -m -both -Oz -o normalized.vcf.gz ~{calls_bcf} + bcftools index -t normalized.vcf.gz >>> runtime { @@ -354,11 +365,12 @@ task BcftoolsNorm { memory: mem_gb + " GiB" cpu: cpu preemptible: preemptible + maxRetries: max_retries } output { - File output_vcf = "~{out_basename}.vcf.gz" - File output_vcf_index = "~{out_basename}.vcf.gz.tbi" + File output_vcf = "normalized.vcf.gz" + File output_vcf_index = "normalized.vcf.gz.tbi" } } @@ -366,9 +378,10 @@ task BcftoolsMerge { input { Array[File] vcfs Array[File] vcf_indices - Int mem_gb = 4 + Int mem_gb = 6 Int cpu = 1 Int preemptible = 0 + Int max_retries = 3 String output_basename } @@ -387,6 +400,7 @@ task BcftoolsMerge { memory: mem_gb + " GiB" cpu: cpu preemptible: preemptible + maxRetries: max_retries } output { @@ -537,9 +551,6 @@ task GlimpseLigate { bcftools view -h --no-version ligated.vcf.gz > old_header.vcf java -jar /picard.jar UpdateVcfSequenceDictionary -I old_header.vcf --SD ~{ref_dict} -O new_header.vcf bcftools reheader -h new_header.vcf -o ~{output_basename}.imputed.vcf.gz ligated.vcf.gz - tabix ~{output_basename}.imputed.vcf.gz - - md5sum ~{output_basename}.imputed.vcf.gz | awk '{ print $1 }' > ~{output_basename}.imputed.vcf.gz.md5sum >>> runtime { @@ -553,8 +564,6 @@ task GlimpseLigate { output { File imputed_vcf = "~{output_basename}.imputed.vcf.gz" - File imputed_vcf_index = "~{output_basename}.imputed.vcf.gz.tbi" - File imputed_vcf_md5sum = "~{output_basename}.imputed.vcf.gz.md5sum" } } @@ -678,3 +687,75 @@ task CombineCoverageMetrics File coverage_metrics="~{output_basename}.coverage_metrics.txt" } } + +task GatherVcfsNoIndex { + input { + Array[File] input_vcfs + String output_vcf_basename + + String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.1.0" + Int cpu = 2 + Int memory_mb = 10000 + Int disk_size_gb = ceil(3*size(input_vcfs, "GiB")) + 10 + } + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 + + command <<< + set -e -o pipefail + + gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ + GatherVcfs \ + -I ~{sep=' -I ' input_vcfs} \ + --REORDER_INPUT_BY_FIRST_VARIANT \ + -O ~{output_vcf_basename}.vcf.gz + >>> + runtime { + docker: gatk_docker + disks: "local-disk ${disk_size_gb} SSD" + memory: "${memory_mb} MiB" + cpu: cpu + maxRetries: 1 + noAddress: true + } + output { + File output_vcf = "~{output_vcf_basename}.vcf.gz" + } +} + +task CreateVcfIndexAndMd5 { + input { + File vcf_input + + Int disk_size_gb = ceil(1.1*size(vcf_input, "GiB")) + 10 + Int cpu = 1 + Int memory_mb = 6000 + String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.5.0.0" + Int preemptible = 3 + } + + String vcf_basename = basename(vcf_input) + + command <<< + set -e -o pipefail + + ln -sf ~{vcf_input} ~{vcf_basename} + + bcftools index -t ~{vcf_basename} + md5sum ~{vcf_basename} | awk '{ print $1 }' > ~{vcf_basename}.md5sum + >>> + runtime { + docker: gatk_docker + disks: "local-disk ${disk_size_gb} SSD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: preemptible + maxRetries: 1 + noAddress: true + } + output { + File output_vcf = "~{vcf_basename}" + File output_vcf_index = "~{vcf_basename}.tbi" + File output_vcf_md5sum = "~{vcf_basename}.md5sum" + } +}