|
| 1 | +package com.fulcrumgenomics.vcf |
| 2 | + |
| 3 | +import com.fulcrumgenomics.FgBioDef._ |
| 4 | +import com.fulcrumgenomics.commons.io.Io |
| 5 | +import com.fulcrumgenomics.commons.util.LazyLogging |
| 6 | +import com.fulcrumgenomics.fasta.SequenceDictionary |
| 7 | +import com.fulcrumgenomics.sopt.{arg, clp} |
| 8 | +import com.fulcrumgenomics.sopt.cmdline.ValidationException |
| 9 | +import com.fulcrumgenomics.util.{Metric, ProgressLogger} |
| 10 | +import com.fulcrumgenomics.vcf.api.Allele.NoCallAllele |
| 11 | +import com.fulcrumgenomics.vcf.api.{Allele, Genotype, Variant, VcfCount, VcfFieldType, VcfFormatHeader, VcfHeader, VcfSource, VcfWriter} |
| 12 | +import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool} |
| 13 | +import com.fulcrumgenomics.vcf.DownsampleVcf.{downsampleAndRegenotype, winnowVariants} |
| 14 | + |
| 15 | +import scala.math.log10 |
| 16 | +import scala.util.Random |
| 17 | +import scala.tools.nsc.doc.html.HtmlTags |
| 18 | + |
| 19 | +object DownsampleVcf extends LazyLogging { |
| 20 | + /** Removes variants that are within a specified distance from a previous variant. |
| 21 | + * The end position of the current variant is compared with the start position of the following variant. |
| 22 | + * @param variants an iterator of the variants to process |
| 23 | + * @param windowSize the interval (exclusive) in which to check for additional variants. |
| 24 | + * windowSize considers the distance between the end position of a variant |
| 25 | + * with the start position of the following variant |
| 26 | + * @param dict a sequencing dictionary to get contig ordering |
| 27 | + * @return a new iterator of variants with just the variant entries we want to keep |
| 28 | + */ |
| 29 | + def winnowVariants(variants: Iterator[Variant], windowSize: Int, dict: SequenceDictionary): Iterator[Variant] = { |
| 30 | + require(windowSize >= 0, f"the windowSize ($windowSize) is negative") |
| 31 | + new Iterator[Variant] { |
| 32 | + private val iter = variants.bufferBetter |
| 33 | + |
| 34 | + def hasNext: Boolean = iter.hasNext |
| 35 | + |
| 36 | + private def isInOrder(current: Variant, next: Variant, currentIndex: Int, nextIndex: Int): Boolean = { |
| 37 | + (currentIndex < nextIndex) || (currentIndex == nextIndex && current.end <= next.pos) |
| 38 | + } |
| 39 | + |
| 40 | + def next(): Variant = { |
| 41 | + val current = iter.next() |
| 42 | + val currentIndex = dict(current.chrom).index |
| 43 | + iter.dropWhile { next: Variant => |
| 44 | + val nextIndex = dict(next.chrom).index |
| 45 | + require( |
| 46 | + isInOrder(current, next, currentIndex, nextIndex), |
| 47 | + f"variants out of order; ${current.chrom}:${current.pos} > ${next.chrom}:${next.pos}") |
| 48 | + |
| 49 | + currentIndex == nextIndex && next.pos - current.end < windowSize |
| 50 | + } |
| 51 | + current |
| 52 | + } |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + /** Downsamples variants by randomly sampling the total allele depths at the given proportion. |
| 57 | + * @param oldAds an indexed seq of the original allele depths |
| 58 | + * @param proportion the proportion to use for downsampling, |
| 59 | + * calculated using total base count from the index and a target base count |
| 60 | + * @return a new IndexedSeq of allele depths of the same length as `oldAds` |
| 61 | + */ |
| 62 | + def downsampleADs(oldAds: IterableOnce[Int], proportion: Double, random: Random): IndexedSeq[Int] = { |
| 63 | + require(proportion <= 1, f"proportion must be less than 1: proportion = ${proportion}") |
| 64 | + oldAds.iterator.toIndexedSeq.map(s => Range(0, s).iterator.map(_ => random.nextDouble()).count(_ < proportion)) |
| 65 | + } |
| 66 | + |
| 67 | + /** |
| 68 | + * Re-genotypes a variant for each sample after downsampling the allele counts based on the given |
| 69 | + * per-sample proportions. |
| 70 | + * @param variant the variant to downsample and re-genotype |
| 71 | + * @param proportions proportion to downsample the allele counts for each sample prior to re-genotyping |
| 72 | + * @param random random number generator for downsampling |
| 73 | + * @param epsilon the sequencing error rate for genotyping |
| 74 | + * @return a new variant with updated genotypes, downsampled ADs, and recomputed PLs |
| 75 | + */ |
| 76 | + def downsampleAndRegenotype(variant: Variant, |
| 77 | + proportions: Map[String, Double], |
| 78 | + random: Random, epsilon: Double=0.01): Variant = { |
| 79 | + try { |
| 80 | + variant.copy(genotypes=variant.genotypes.map { case (sample, gt) => |
| 81 | + val proportion = proportions(sample) |
| 82 | + sample -> downsampleAndRegenotype(gt=gt, proportion=proportion, random=random, epsilon=epsilon) |
| 83 | + }) |
| 84 | + } catch { |
| 85 | + case e: MatchError => throw new Exception( |
| 86 | + "processing " + variant.id + " at " + variant.chrom + ":" + variant.pos + "-" + variant.end, e |
| 87 | + ) |
| 88 | + } |
| 89 | + } |
| 90 | + |
| 91 | + /** |
| 92 | + * Re-genotypes a sample after downsampling the allele counts based on the given proportion. |
| 93 | + * @param gt the genotype to downsample |
| 94 | + * @param proportion proportion to downsample the allele count prior to re-genotyping |
| 95 | + * @param random random number generator for downsampling |
| 96 | + * @param epsilon the sequencing error rate for genotyping |
| 97 | + * @return a new Genotype with updated allele depths, PLs, and genotype |
| 98 | + */ |
| 99 | + def downsampleAndRegenotype(gt: Genotype, proportion: Double, random: Random, epsilon: Double): Genotype = { |
| 100 | + val oldAds = gt.getOrElse[IndexedSeq[Int]]("AD", throw new Exception(s"AD tag not found for sample ${gt.sample}")) |
| 101 | + val newAds = downsampleADs(oldAds, proportion, random) |
| 102 | + val likelihoods = Likelihoods(newAds) |
| 103 | + val pls = likelihoods.pls |
| 104 | + val calls = likelihoods.mostLikelyCall(gt.alleles.toSeq) |
| 105 | + gt.copy(attrs=Map("PL" -> pls, "AD" -> newAds, "DP" -> newAds.sum), calls=calls) |
| 106 | + } |
| 107 | + |
| 108 | + object Likelihoods { |
| 109 | + /**Converts a sequence of log-likelihoods to phred-scale by 1) multiplying each by -10, 2) |
| 110 | + * subtracting from each the min value so the smallest value is 0, and 3) rounding to the |
| 111 | + * nearest integer. |
| 112 | + */ |
| 113 | + def logToPhredLikelihoods(logLikelihoods: IndexedSeq[Double]): IndexedSeq[Int] = { |
| 114 | + val rawPL = logLikelihoods.map(gl => gl * -10) |
| 115 | + val minPL = rawPL.min |
| 116 | + rawPL.map(pl => (pl - minPL).round.toInt) |
| 117 | + } |
| 118 | + |
| 119 | + /** Computes the likelihoods for each possible biallelic genotype. |
| 120 | + * @param alleleDepthA the reference allele depth |
| 121 | + * @param alleleDepthB the alternate allele depth |
| 122 | + * @param epsilon the error rate for genotyping |
| 123 | + * @return a new `Likelihood` that has the likelihoods of AA, AB, and BB |
| 124 | + */ |
| 125 | + def biallelic(alleleDepthA: Int, alleleDepthB: Int, epsilon: Double = 0.01): IndexedSeq[Double] = { |
| 126 | + val aGivenAA = log10(1 - epsilon) |
| 127 | + val aGivenBB = log10(epsilon) |
| 128 | + val aGivenAB = log10((1 - epsilon) / 2) |
| 129 | + |
| 130 | + val rawGlAA = ((alleleDepthA * aGivenAA) + (alleleDepthB * aGivenBB)) |
| 131 | + val rawGlBB = ((alleleDepthA * aGivenBB) + (alleleDepthB * aGivenAA)) |
| 132 | + val rawGlAB = ((alleleDepthA + alleleDepthB) * aGivenAB) |
| 133 | + |
| 134 | + IndexedSeq(rawGlAA, rawGlAB, rawGlBB) |
| 135 | + } |
| 136 | + |
| 137 | + /** Computes the likelihoods for each possible genotype given a sequence of read depths for any |
| 138 | + * number of alleles. |
| 139 | + * @param alleleDepths the sequence of allele depths in the order specified in the VCF |
| 140 | + * @param epsilon the error rate for genotyping |
| 141 | + * @return a new `Likelihood` that has the log likelihoods of all possible genotypes in the |
| 142 | + * order specified in VFC spec for the GL/PL tags. |
| 143 | + */ |
| 144 | + def generalized(alleleDepths: IndexedSeq[Int], epsilon: Double = 0.01): IndexedSeq[Double] = { |
| 145 | + val numAlleles = alleleDepths.length |
| 146 | + // probabilities associated with each possible genotype for a pair of alleles |
| 147 | + val logProbs: Array[Double] = Array( |
| 148 | + math.log10(epsilon), |
| 149 | + math.log10((1 - epsilon) / 2), |
| 150 | + math.log10(1 - epsilon) |
| 151 | + ) |
| 152 | + // compute genotype log-likelihoods |
| 153 | + (0 until numAlleles).flatMap(b => |
| 154 | + (0 to b).map(a => |
| 155 | + (0 until numAlleles).map(allele => |
| 156 | + logProbs(Array(a, b).count(_ == allele)) * alleleDepths(allele) |
| 157 | + ).sum |
| 158 | + ) |
| 159 | + ) |
| 160 | + } |
| 161 | + |
| 162 | + def apply(alleleDepths: IndexedSeq[Int], epsilon: Double = 0.01): Likelihoods = { |
| 163 | + val numAlleles = alleleDepths.length |
| 164 | + require(numAlleles >= 2, "at least two alleles are required to calculate genotype likelihoods") |
| 165 | + Likelihoods(numAlleles, generalized(alleleDepths, epsilon)) |
| 166 | + } |
| 167 | + } |
| 168 | + |
| 169 | + /** Stores the log10(likelihoods) for all possible genotypes. |
| 170 | + * @param numAlleles the number of alleles the variant has |
| 171 | + * @param genotypeLikelihoods sequence of GLs in the order specified in the VCF spec |
| 172 | + */ |
| 173 | + case class Likelihoods(numAlleles: Int, genotypeLikelihoods: IndexedSeq[Double]) { |
| 174 | + /** |
| 175 | + * Returns the likelihoods as a list of phred-scaled integers (i.e, the value of the PL tag). |
| 176 | + * @return a list of phred-scaled likelihooodS for AA, AB, BB. |
| 177 | + */ |
| 178 | + def pls: IndexedSeq[Int] = { |
| 179 | + Likelihoods.logToPhredLikelihoods(genotypeLikelihoods) |
| 180 | + } |
| 181 | + |
| 182 | + def mostLikelyGenotype: Option[(Int, Int)] = { |
| 183 | + val minIndexes = pls.zipWithIndex.filter(pair => pair._1 == 0) |
| 184 | + minIndexes.length match { |
| 185 | + case 0 => throw new RuntimeException("expected the most likely PL to have a value of 0.0") |
| 186 | + case 1 => { |
| 187 | + val genotypes = |
| 188 | + for (b <- 0 until numAlleles; a <- 0 to b) |
| 189 | + yield (a, b) |
| 190 | + Some(genotypes(minIndexes.head._2)) |
| 191 | + } |
| 192 | + case _ => None // if multiple genotypes are most likely, don't make a call |
| 193 | + } |
| 194 | + } |
| 195 | + |
| 196 | + def mostLikelyCall(alleles: Seq[Allele]): IndexedSeq[Allele] = { |
| 197 | + mostLikelyGenotype match { |
| 198 | + case None => IndexedSeq(NoCallAllele, NoCallAllele) |
| 199 | + case Some((a, b)) => IndexedSeq(alleles(a), alleles(b)) |
| 200 | + } |
| 201 | + } |
| 202 | + } |
| 203 | +} |
| 204 | + |
| 205 | +@clp(group=ClpGroups.VcfOrBcf, description = |
| 206 | + """ |
| 207 | + |Re-genotypes a VCF after downsampling the allele counts. |
| 208 | + | |
| 209 | + |The input VCF must have at least one sample. |
| 210 | + | |
| 211 | + |If the input VCF contains a single sample, the downsampling target may be specified as a |
| 212 | + |proportion of the original read depth using `--proportion=(0..1)`, or as the combination of |
| 213 | + |the original and target _number of sequenced bases_ (`--originalBases` and |
| 214 | + |`--downsampleToBases`). For multi-sample VCFs, the downsampling target must be specified using |
| 215 | + |`--downsampleToBases`, and a metadata file with the total number of sequenced bases per sample |
| 216 | + |is required as well. The metadata file must follow the |
| 217 | + |[[https://www.internationalgenome.org/category/meta-data/] 1000 Genomes index format], but the |
| 218 | + |only required columns are `SAMPLE_NAME` and `BASE_COUNT`. A propportion for each sample is |
| 219 | + |calculated by dividing the _target number of sequenced bases_ by the _original number of |
| 220 | + |sequenced bases_. |
| 221 | + | |
| 222 | + |The tool first (optionally) winnows the VCF file to remove variants within a distance to each |
| 223 | + |other specified by `--window-size` (the default value of `0` disables winnowing). Next, each |
| 224 | + |sample at each variant is examined independently. The allele depths per-genotype are randoml |
| 225 | + |downsampled given the proportion. The downsampled allele depths are then used to re-compute |
| 226 | + |allele likelhoods and produce a new genotype. |
| 227 | + | |
| 228 | + |The tool outputs a downsampled VCF file with the winnowed variants removed, and with the |
| 229 | + |genotype calls and `DP`, `AD`, and `PL` tags updated for each sample at each retained variant. |
| 230 | +""") |
| 231 | +class DownsampleVcf |
| 232 | +(@arg(flag='i', doc="The vcf to downsample.") val input: PathToVcf, |
| 233 | + @arg(flag='p', doc="Proportion of bases to retain (for single-sample VCF).") val proportion: Option[Double] = None, |
| 234 | + @arg(flag='b', doc="Original number of bases (for single-sample VCF).") val originalBases: Option[Double] = None, |
| 235 | + @arg(flag='m', doc="Index file with bases per sample.") val metadata: Option[FilePath] = None, |
| 236 | + @arg(flag='n', doc="Target number of bases to downsample to.") val downsampleToBases: Option[Double] = None, |
| 237 | + @arg(flag='o', doc="Output file name.") val output: PathToVcf, |
| 238 | + @arg(flag='w', doc="Winnowing window size.") val windowSize: Int = 0, |
| 239 | + @arg(flag='e', doc="Sequencing Error rate for genotyping.") val epsilon: Double = 0.01, |
| 240 | + @arg(flag='c', doc="True to write out no-calls.") val writeNoCall: Boolean = false, |
| 241 | + @arg(flag='s', doc="Random seed value.") val seed: Int = 42, |
| 242 | + ) extends FgBioTool { |
| 243 | + Io.assertReadable(input) |
| 244 | + Io.assertCanWriteFile(output) |
| 245 | + validate(windowSize >= 0, "window size must be greater than or equal to zero") |
| 246 | + validate(0 <= epsilon && epsilon <= 1, "epsilon/error rate must be between 0 and 1") |
| 247 | + (proportion, originalBases, metadata, downsampleToBases) match { |
| 248 | + case (Some(x), None, None, None) => |
| 249 | + validate(x > 0, "proportion must be greater than 0") |
| 250 | + validate(x < 1, "proportion must be less than 1") |
| 251 | + case (None, Some(original), None, Some(target)) => |
| 252 | + validate(original > 0, "originalBases must be greater than zero") |
| 253 | + validate(target > 0, "target base count must be greater than zero") |
| 254 | + case (None, None, Some(metadata), Some(target)) => |
| 255 | + Io.assertReadable(metadata) |
| 256 | + require(target > 0, "target base count must be greater than zero") |
| 257 | + case (None, _, _, None) => |
| 258 | + throw new ValidationException( |
| 259 | + "exactly one of proportion or downsampleToBases must be specified" |
| 260 | + ) |
| 261 | + case _ => |
| 262 | + throw new ValidationException( |
| 263 | + "exactly one of proportion, originalBases, or metadata must be specified" |
| 264 | + ) |
| 265 | + } |
| 266 | + |
| 267 | + override def execute(): Unit = { |
| 268 | + val vcf = VcfSource(input) |
| 269 | + val proportions = ( |
| 270 | + (proportion, originalBases, metadata, downsampleToBases) match { |
| 271 | + case (Some(x), None, None, None) => |
| 272 | + require(vcf.header.samples.length == 1, "--original-bases requires a single-sample VCF") |
| 273 | + LazyList(vcf.header.samples.head -> x) |
| 274 | + case (None, Some(original), None, Some(target)) => |
| 275 | + require(vcf.header.samples.length == 1, "--original-bases requires a single-sample VCF") |
| 276 | + LazyList(vcf.header.samples.head -> math.min(target / original, 1.0)) |
| 277 | + case (None, None, Some(metadata), Some(target)) => |
| 278 | + Sample.read(metadata) |
| 279 | + .filter(s => vcf.header.samples.contains(s.SAMPLE_NAME)) |
| 280 | + .map(sample => sample.SAMPLE_NAME -> math.min(target / sample.BASE_COUNT.toDouble, 1.0)) |
| 281 | + case _ => |
| 282 | + throw new RuntimeException("unexpected parameter combination") |
| 283 | + } |
| 284 | + ).toMap |
| 285 | + proportions.foreach { case (s, p) => logger.info(f"Downsampling $s with proportion ${p}%.4f") } |
| 286 | + |
| 287 | + val inputProgress = ProgressLogger(logger, noun="variants read") |
| 288 | + val inputVariants = ProgressLogger.ProgressLoggingIterator(vcf.iterator).progress(inputProgress) |
| 289 | + val winnowed = if (windowSize > 0) { |
| 290 | + val winnowed = winnowVariants(inputVariants, windowSize=windowSize, dict=vcf.header.dict) |
| 291 | + val winnowedProgress = ProgressLogger(logger, noun="variants retained") |
| 292 | + ProgressLogger.ProgressLoggingIterator(winnowed).progress(winnowedProgress) |
| 293 | + } else { |
| 294 | + inputVariants |
| 295 | + } |
| 296 | + val outputVcf = VcfWriter(path=output, header=buildOutputHeader(vcf.header)) |
| 297 | + |
| 298 | + val progress = ProgressLogger(logger, noun="variants written") |
| 299 | + val random = new Random(seed) |
| 300 | + winnowed.foreach { v => |
| 301 | + val ds = downsampleAndRegenotype(v, proportions=proportions, random=random, epsilon=epsilon) |
| 302 | + if (writeNoCall || !ds.gts.forall(g => g.isNoCall)) { |
| 303 | + outputVcf += ds |
| 304 | + progress.record(ds) |
| 305 | + } |
| 306 | + } |
| 307 | + |
| 308 | + progress.logLast() |
| 309 | + vcf.safelyClose() |
| 310 | + outputVcf.close() |
| 311 | + } |
| 312 | + |
| 313 | + private def buildOutputHeader(in: VcfHeader): VcfHeader = { |
| 314 | + val fmts = Seq.newBuilder[VcfFormatHeader] |
| 315 | + fmts ++= in.formats |
| 316 | + |
| 317 | + if (!in.format.contains("AD")) { |
| 318 | + fmts += VcfFormatHeader(id="AD", count=VcfCount.OnePerAllele, kind=VcfFieldType.Integer, description="Per allele depths.") |
| 319 | + } |
| 320 | + |
| 321 | + if (!in.format.contains("DP")) { |
| 322 | + fmts += VcfFormatHeader(id="DP", count=VcfCount.Fixed(1), kind=VcfFieldType.Integer, description="Total depth across alleles.") |
| 323 | + } |
| 324 | + |
| 325 | + if (!in.format.contains("PL")) { |
| 326 | + fmts += VcfFormatHeader(id="PL", count=VcfCount.OnePerGenotype, kind=VcfFieldType.Integer, description="Per genotype phred scaled likelihoods.") |
| 327 | + } |
| 328 | + |
| 329 | + in.copy(formats=fmts.result()) |
| 330 | + } |
| 331 | +} |
| 332 | + |
| 333 | +private object Sample { |
| 334 | + /** Load a set of samples from the 1KG metadata file. */ |
| 335 | + def read(path: FilePath): Seq[Sample] = { |
| 336 | + val lines = Io.readLines(path).dropWhile(_.startsWith("##")).map(line => line.dropWhile(_ == '#')) |
| 337 | + Metric.read[Sample](lines=lines) |
| 338 | + } |
| 339 | +} |
| 340 | + |
| 341 | +private case class Sample(ENA_FILE_PATH: String = ".", |
| 342 | + MD5SUM: String = ".", |
| 343 | + RUN_ID: String = ".", |
| 344 | + STUDY_ID: String = ".", |
| 345 | + STUDY_NAME: String = ".", |
| 346 | + CENTER_NAME: String = ".", |
| 347 | + SUBMISSION_ID: String = ".", |
| 348 | + SUBMISSION_DATE: String = ".", |
| 349 | + SAMPLE_ID: String = ".", |
| 350 | + SAMPLE_NAME: String, |
| 351 | + POPULATION: String = ".", |
| 352 | + EXPERIMENT_ID: String = ".", |
| 353 | + INSTRUMENT_PLATFORM: String = ".", |
| 354 | + INSTRUMENT_MODEL: String = ".", |
| 355 | + LIBRARY_NAME: String = ".", |
| 356 | + RUN_NAME: String = ".", |
| 357 | + INSERT_SIZE: String = ".", |
| 358 | + LIBRARY_LAYOUT: String = ".", |
| 359 | + PAIRED_FASTQ: String = ".", |
| 360 | + READ_COUNT: String = ".", |
| 361 | + BASE_COUNT: Long, |
| 362 | + ANALYSIS_GROUP: String = ".") extends Metric |
0 commit comments