From 9390abc22d6cccaad30c06021186a5ee28b68cad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Simoneau?= Date: Wed, 12 Jul 2023 14:34:05 -0400 Subject: [PATCH 1/8] feat: Adding OMIM is_recessive parsing --- .../spark3/implicits/ACMGImplicits.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala index 9def4b43..856ffb2f 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala @@ -46,6 +46,31 @@ object ACMGImplicits { } } + /** + * PM2 - ACMG criteria + * Moderate (supporting) evidence of pathogenic impact. + * Absent from controls (or at extremely low frequency if recessive) in Exome Sequencing Project, 1000 Genomes + * Project, or Exome Aggregration Consortium. + * + * WIP + * + */ + def getPM2(omimDF: DataFrame): Column = { + + val inheritance_modes = List( + "Pseudoautosomal recessive", + "Autosomal recessive", + "Digenic recessive", + "X-linked recessive") + + val _df = omim_df.select("symbols", "phenotype.inheritance") + .withColumn("is_recessive", inheritance_modes.map(m => array_contains($"inheritance", m)).reduce(_ || _)) + .select($"is_recessive", explode($"symbols").as("gene_symbol")) + .filter($"is_recessive" === true) + + lit(1) + } + } } From 722df5f2c4feedd3d4851ab866615a199276297f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Simoneau?= Date: Wed, 12 Jul 2023 20:37:29 -0400 Subject: [PATCH 2/8] feat: PM2 criteria logic --- .../spark3/implicits/ACMGImplicits.scala | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala index 856ffb2f..f0353844 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala @@ -55,18 +55,48 @@ object ACMGImplicits { * WIP * */ - def getPM2(omimDF: DataFrame): Column = { + def getPM2(omim: DataFrame, frequencies: DataFrame): Column = { - val inheritance_modes = List( + // Extracting inheritance, identifying recessive genes + val inheritanceModes = List( "Pseudoautosomal recessive", "Autosomal recessive", "Digenic recessive", "X-linked recessive") - val _df = omim_df.select("symbols", "phenotype.inheritance") - .withColumn("is_recessive", inheritance_modes.map(m => array_contains($"inheritance", m)).reduce(_ || _)) + val omimRecessive = omim.select("symbols", "phenotype.inheritance") + .withColumn("is_recessive", inheritanceModes.map(m => array_contains($"inheritance", m)).reduce(_ || _)) .select($"is_recessive", explode($"symbols").as("gene_symbol")) .filter($"is_recessive" === true) + .distinct() + + + // Extracting frequency (and lack of) + val maxAf = frequencies.schema("external_frequencies").dataType match { + case s: StructType => { + val afCols = s.fields.map(_.name).map { field => + struct(col(s"external_frequencies.$field.af") as "v", lit(field) as "k") + } + greatest(afCols: _*).getItem("v").as("max_af") + } + } + + val freqPerSymbol = frequencies.select( + $"chromosome", + $"start", + $"end", + $"reference", + $"alternate", + explode($"genes_symbol").as("gene_symbol"), + maxAf, + maxAf.isNull.as("max_af_is_null") + ) + + // Joining and computing PM2 + val pm2 = freqPerSymbol.join(omimRecessive, Seq("gene_symbol"), "leftouter") + .na.fill(false, Seq("is_recessive")) + .withColumn("PM2", $"max_af_is_null" || $"max_af" === 0 || ($"is_recessive" && $"max_af" < 0.0001)) + lit(1) } @@ -74,3 +104,6 @@ object ACMGImplicits { } } + + + From 5bce110a11ef7444e75bf0aef98915006c7d30aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Simoneau?= Date: Wed, 12 Jul 2023 22:19:15 -0400 Subject: [PATCH 3/8] feat: final join --- .../bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala index f0353844..71b095d2 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala @@ -97,8 +97,11 @@ object ACMGImplicits { .na.fill(false, Seq("is_recessive")) .withColumn("PM2", $"max_af_is_null" || $"max_af" === 0 || ($"is_recessive" && $"max_af" < 0.0001)) + val _df = df.join(pm2, Seq("chromosome", "start", "end", "reference", "alternate"), "leftouter") - lit(1) + struct( + _df("PM2") + ) } } From ff391efadc15a0b7c5cb61ed4c92554285e561c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Simoneau?= Date: Mon, 17 Jul 2023 16:02:24 -0400 Subject: [PATCH 4/8] fix: using col() --- .../spark3/implicits/ACMGImplicits.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala index 71b095d2..46da09e3 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala @@ -65,9 +65,9 @@ object ACMGImplicits { "X-linked recessive") val omimRecessive = omim.select("symbols", "phenotype.inheritance") - .withColumn("is_recessive", inheritanceModes.map(m => array_contains($"inheritance", m)).reduce(_ || _)) - .select($"is_recessive", explode($"symbols").as("gene_symbol")) - .filter($"is_recessive" === true) + .withColumn("is_recessive", inheritanceModes.map(m => array_contains(col("inheritance"), m)).reduce(_ || _)) + .select(col("inheritance"), explode(col("symbols")).as("gene_symbol")) + .filter(col("inheritance") === true) .distinct() @@ -82,12 +82,12 @@ object ACMGImplicits { } val freqPerSymbol = frequencies.select( - $"chromosome", - $"start", - $"end", - $"reference", - $"alternate", - explode($"genes_symbol").as("gene_symbol"), + col("chromosome"), + col("start"), + col("end"), + col("reference"), + col("alternate"), + explode(col("genes_symbol")).as("gene_symbol"), maxAf, maxAf.isNull.as("max_af_is_null") ) @@ -95,7 +95,10 @@ object ACMGImplicits { // Joining and computing PM2 val pm2 = freqPerSymbol.join(omimRecessive, Seq("gene_symbol"), "leftouter") .na.fill(false, Seq("is_recessive")) - .withColumn("PM2", $"max_af_is_null" || $"max_af" === 0 || ($"is_recessive" && $"max_af" < 0.0001)) + .withColumn("PM2", + col("max_af_is_null") + || col("max_af") === 0 + || (col("is_recessive") && col("max_af") < 0.0001)) val _df = df.join(pm2, Seq("chromosome", "start", "end", "reference", "alternate"), "leftouter") From 34ab1473ddb700cea3e0b13c136fca2f329c3cb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Simoneau?= Date: Mon, 17 Jul 2023 16:36:13 -0400 Subject: [PATCH 5/8] feat: adding basic PM2 fixture --- .../spark3/implicits/ACMGImplicitsSpec.scala | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicitsSpec.scala b/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicitsSpec.scala index a937cf99..b92fc366 100644 --- a/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicitsSpec.scala +++ b/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicitsSpec.scala @@ -12,6 +12,46 @@ class ACMGImplicitsSpec extends AnyFlatSpec with WithSparkSession with Matchers spark.sparkContext.setLogLevel("ERROR") + + def pm2Fixture = { + new { + val omimSchema = new StructType() + .add("symbols", new ArrayType(StringType, true), true) + .add("phenotype", new StructType() + .add("inheritance", new ArrayType(StringType, true), true) + ) + + val omimData = Seq( + Row(Array("gene1", "gene2"), Row(Array("Digenic recessive"))), + Row(Array("gene3"), Row(Array("Autosomal Recessive"))), + Row(Array("gene4"), Row(Array("Autosomal Dominant"))) + ) + + val omimDF = spark.createDataFrame(spark.sparkContext.parallelize(omimData), omimSchema) + + val freqSchema = new StructType() + .add("chromosome", StringType, true) + .add("start", IntegerType, true) + .add("end", IntegerType, true) + .add("reference", StringType, true) + .add("alternate", StringType, true) + .add("external_frequencies", new StructType() + .add("thousand_genomes", new StructType() + .add("af", DoubleType, true) + .add("an", IntegerType, true)) + .add("topmed_bravo", new StructType() + .add("af", DoubleType, true) + .add("an", IntegerType, true))) + + val freqData = Seq( + Row("1", 1, 2, "A", "C", Row(Row(0.001, 2), Row(0.050, 50))) + ) + + val freqDF = spark.createDataFrame(spark.sparkContext.parallelize(omimData), omimSchema) + + } + } + def ba1Fixture = { new { val querySchema = new StructType() @@ -65,4 +105,10 @@ class ACMGImplicitsSpec extends AnyFlatSpec with WithSparkSession with Matchers f.result.collect() should contain theSameElementsAs f.resultData } + "getPM2" should "do something" in { + val f = pm2Fixture + + 1 shouldBe 1 + } + } From 54c4c6c2de700871f3e26f5f5bb26d4e9c006d0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Simoneau?= Date: Thu, 20 Jul 2023 16:58:14 -0400 Subject: [PATCH 6/8] feat: update PM2 approach --- .../spark3/implicits/ACMGImplicits.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala index 46da09e3..0ffa6a2e 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala @@ -55,7 +55,7 @@ object ACMGImplicits { * WIP * */ - def getPM2(omim: DataFrame, frequencies: DataFrame): Column = { + def getPM2(omim: DataFrame, frequencies: DataFrame): DataFrame = { // Extracting inheritance, identifying recessive genes val inheritanceModes = List( @@ -66,8 +66,8 @@ object ACMGImplicits { val omimRecessive = omim.select("symbols", "phenotype.inheritance") .withColumn("is_recessive", inheritanceModes.map(m => array_contains(col("inheritance"), m)).reduce(_ || _)) - .select(col("inheritance"), explode(col("symbols")).as("gene_symbol")) - .filter(col("inheritance") === true) + .select(col("is_recessive"), explode(col("symbols")).as("symbol")) + .filter(col("is_recessive") === true) .distinct() @@ -87,29 +87,29 @@ object ACMGImplicits { col("end"), col("reference"), col("alternate"), - explode(col("genes_symbol")).as("gene_symbol"), + explode(col("genes_symbol")).as("symbol"), maxAf, maxAf.isNull.as("max_af_is_null") ) - // Joining and computing PM2 - val pm2 = freqPerSymbol.join(omimRecessive, Seq("gene_symbol"), "leftouter") + df + .join(omimRecessive, Seq("symbol"), "leftouter") .na.fill(false, Seq("is_recessive")) + .join(freqPerSymbol, Seq("chromosome", "start", "end", "reference", "alternate", "symbol"), "leftouter") + .na.fill(false, Seq("max_af_is_null")) + .na.fill(0, Seq("max_af")) .withColumn("PM2", - col("max_af_is_null") - || col("max_af") === 0 - || (col("is_recessive") && col("max_af") < 0.0001)) - - val _df = df.join(pm2, Seq("chromosome", "start", "end", "reference", "alternate"), "leftouter") + struct( + col("is_recessive").as("is_recessive"), + col("max_af").as("max_af"), + col("max_af_is_null").as("max_af_is_null"), + (col("max_af_is_null") || + col("max_af") === 0 || + (col("is_recessive") && col("max_af") < 0.0001)).as("score") + ) + ) + .drop("is_recessive", "max_af", "max_af_is_null") - struct( - _df("PM2") - ) } - } - } - - - From fbed3be60ae8d61651c057dc165b6024039f1c52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Simoneau?= Date: Thu, 20 Jul 2023 17:38:39 -0400 Subject: [PATCH 7/8] feat: adding require logic for multiple DF --- .../spark3/implicits/ACMGImplicits.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala index 0ffa6a2e..daf8d29b 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicits.scala @@ -6,6 +6,18 @@ import org.apache.spark.sql.{Column, DataFrame} object ACMGImplicits { + val variantColumns = Array("chromosome", "start", "end", "reference", "alternate") + + private def validateRequiredColumns(map: Map[DataFrame, (String, Array[String])], criteriaName: String = "criteria"): Unit = { + map.foreach { + case (df, (dfName, columns)) => columns.foreach( + col => require( + df.columns.contains(col), + s"Column `$col` is required in DataFrame $dfName for $criteriaName.") + ) + } + } + implicit class ACMGOperations(df: DataFrame) { /** @@ -57,6 +69,13 @@ object ACMGImplicits { */ def getPM2(omim: DataFrame, frequencies: DataFrame): DataFrame = { + val map = Map( + df -> ("df", Array("symbol") ++ variantColumns), + omim -> ("omim", Array("symbols", "phenotype")), + frequencies -> ("frequencies", Array("external_frequencies", "genes_symbol") ++ variantColumns) + ) + validateRequiredColumns(map, "PM2") + // Extracting inheritance, identifying recessive genes val inheritanceModes = List( "Pseudoautosomal recessive", From cc5d1d66dc9773a18d8e8a7e78b12dfa037be016 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Simoneau?= Date: Fri, 21 Jul 2023 12:04:14 -0400 Subject: [PATCH 8/8] feat: adding PM2 tests --- .../spark3/implicits/ACMGImplicitsSpec.scala | 163 +++++++++++++----- 1 file changed, 120 insertions(+), 43 deletions(-) diff --git a/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicitsSpec.scala b/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicitsSpec.scala index b92fc366..c60685c9 100644 --- a/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicitsSpec.scala +++ b/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/implicits/ACMGImplicitsSpec.scala @@ -12,45 +12,12 @@ class ACMGImplicitsSpec extends AnyFlatSpec with WithSparkSession with Matchers spark.sparkContext.setLogLevel("ERROR") - - def pm2Fixture = { - new { - val omimSchema = new StructType() - .add("symbols", new ArrayType(StringType, true), true) - .add("phenotype", new StructType() - .add("inheritance", new ArrayType(StringType, true), true) - ) - - val omimData = Seq( - Row(Array("gene1", "gene2"), Row(Array("Digenic recessive"))), - Row(Array("gene3"), Row(Array("Autosomal Recessive"))), - Row(Array("gene4"), Row(Array("Autosomal Dominant"))) - ) - - val omimDF = spark.createDataFrame(spark.sparkContext.parallelize(omimData), omimSchema) - - val freqSchema = new StructType() - .add("chromosome", StringType, true) - .add("start", IntegerType, true) - .add("end", IntegerType, true) - .add("reference", StringType, true) - .add("alternate", StringType, true) - .add("external_frequencies", new StructType() - .add("thousand_genomes", new StructType() - .add("af", DoubleType, true) - .add("an", IntegerType, true)) - .add("topmed_bravo", new StructType() - .add("af", DoubleType, true) - .add("an", IntegerType, true))) - - val freqData = Seq( - Row("1", 1, 2, "A", "C", Row(Row(0.001, 2), Row(0.050, 50))) - ) - - val freqDF = spark.createDataFrame(spark.sparkContext.parallelize(omimData), omimSchema) - - } - } + val variantSchema = new StructType() + .add("chromosome", StringType, true) + .add("start", IntegerType, true) + .add("end", IntegerType, true) + .add("reference", StringType, true) + .add("alternate", StringType, true) def ba1Fixture = { new { @@ -86,7 +53,7 @@ class ACMGImplicitsSpec extends AnyFlatSpec with WithSparkSession with Matchers } } - "get_BA1" should "throw IllegalArgumentException if `external_frequencies` column is absent" in { + "getBA1" should "throw IllegalArgumentException if `external_frequencies` column is absent" in { val structureData = Seq(Row(1), Row(2)) val structureSchema = new StructType().add("start", IntegerType, true) @@ -105,10 +72,120 @@ class ACMGImplicitsSpec extends AnyFlatSpec with WithSparkSession with Matchers f.result.collect() should contain theSameElementsAs f.resultData } - "getPM2" should "do something" in { + + def pm2Fixture = { + new { + val omimSchema = new StructType() + .add("symbols", new ArrayType(StringType, true), true) + .add("phenotype", new StructType() + .add("inheritance", new ArrayType(StringType, true), true) + ) + + val omimData = Seq( + Row(Array("gene1", "gene2"), Row(Array("Digenic recessive"))), + Row(Array("gene3"), Row(Array("Autosomal Recessive"))), + Row(Array("gene4"), Row(Array("Autosomal Dominant"))) + ) + + val omimDF = spark.createDataFrame(spark.sparkContext.parallelize(omimData), omimSchema) + + val freqSchema = variantSchema + .add("genes_symbol", new ArrayType(StringType, true), true) + .add("external_frequencies", new StructType() + .add("thousand_genomes", new StructType() + .add("af", DoubleType, true) + .add("an", IntegerType, true)) + .add("topmed_bravo", new StructType() + .add("af", DoubleType, true) + .add("an", IntegerType, true))) + + val freqData = Seq(Row("1", 1, 2, "A", "C", Array("gene1"), null)) + + val freqDF = spark.createDataFrame(spark.sparkContext.parallelize(freqData), freqSchema) + + val querySchema = variantSchema + .add("symbol", StringType, true) + + val queryDF = spark.createDataFrame(spark.sparkContext.parallelize(queryData), querySchema) + + val queryData = Seq(Row("1", 1, 2, "A", "C", "gene1")) + + val resultData = Seq(Row("1", 1, 2, "A", "C", "gene1", Row(true, 0.01, true, false))) + + val result = queryDF.getPM2(omimDF, freqDF) + } + } + + "getPM2" should "throw IllegalArgumentException if `phenotype` column is absent from the OMIM DataFrame" in { + val f = pm2Fixture + + an[IllegalArgumentException] should be thrownBy f.queryDF.getPM2(f.omimDF.drop("phenotype"), f.freqDF) + } + + it should "return the correct PM2 schema" in { val f = pm2Fixture - - 1 shouldBe 1 + + f.result.schema shouldBe f.querySchema + .add("PM2", new StructType() + .add("is_recessive", BooleanType, false) + .add("max_af", DoubleType, false) + .add("max_af_is_null", BooleanType, false) + .add("score", BooleanType, false), false + ) + } + + it should "return missing frequencies as PM2 true" in { + val f = pm2Fixture + + val freqData = Seq( + Row("1", 1, 2, "A", "C", Array("gene1"), null), + Row("2", 1, 2, "A", "C", Array("gene1"), Row(null, Row(0.0, 0))), + Row("3", 1, 2, "A", "C", Array("gene1"), Row(Row(0.0, 1200), Row(0.0, 1000))), + ) + val freqDF = spark.createDataFrame(spark.sparkContext.parallelize(freqData), f.freqSchema) + + val queryData = Seq( + Row("1", 1, 2, "A", "C", "gene1"), + Row("2", 1, 2, "A", "C", "gene1"), + Row("3", 1, 2, "A", "C", "gene1"), + ) + + val resultData = Seq( + Row("1", 1, 2, "A", "C", "gene1", Row(true, 0.00, true, true)), + Row("2", 1, 2, "A", "C", "gene1", Row(true, 0.00, false, true)), + Row("3", 1, 2, "A", "C", "gene1", Row(true, 0.00, false, true)), + ) + val queryDF = spark.createDataFrame(spark.sparkContext.parallelize(queryData), f.querySchema) + val result = queryDF.getPM2(f.omimDF, freqDF) + + result.collect() should contain theSameElementsAs resultData + } + + it should "return low AF in genes with recessive disease as PM2 true " in { + val f = pm2Fixture + + val freqData = Seq( + Row("1", 1, 2, "A", "C", Array("gene4"), Row(Row(0.00001, 1), Row(0.0, 1))), + Row("1", 1, 2, "A", "C", Array("gene2"), Row(Row(0.00001, 1), Row(0.0, 1))), + Row("2", 1, 2, "A", "C", Array("gene2"), Row(Row(0.001, 1), Row(0.0, 1))), + ) + val freqDF = spark.createDataFrame(spark.sparkContext.parallelize(freqData), f.freqSchema) + + val queryData = Seq( + Row("1", 1, 2, "A", "C", "gene4"), + Row("1", 1, 2, "A", "C", "gene2"), + Row("2", 1, 2, "A", "C", "gene2"), + ) + + val resultData = Seq( + Row("1", 1, 2, "A", "C", "gene4", Row(false, 0.00001, false, false)), + Row("1", 1, 2, "A", "C", "gene2", Row(true, 0.00001, false, true)), + Row("2", 1, 2, "A", "C", "gene2", Row(true, 0.001, false, false)), + ) + val queryDF = spark.createDataFrame(spark.sparkContext.parallelize(queryData), f.querySchema) + val result = queryDF.getPM2(f.omimDF, freqDF) + + result.collect() should contain theSameElementsAs resultData } }