diff --git a/build.sbt b/build.sbt index 7a3324ae..6926c40d 100644 --- a/build.sbt +++ b/build.sbt @@ -63,7 +63,7 @@ lazy val `datalake-spark3` = (project in file("datalake-spark3")) "com.microsoft.sqlserver" % "mssql-jdbc" % "8.4.1.jre11" % Provided, "com.microsoft.aad" % "adal4j" % "0.0.2" % Provided, "com.microsoft.azure" % "spark-mssql-connector_2.12" % "1.2.0" % Provided, - "com.crealytics" %% "spark-excel" % "3.5.0_0.20.3" % Provided, + "dev.mauch" %% "spark-excel" % "3.5.5_0.30.2" % Provided, //Use by ElasticsearchClient "com.softwaremill.sttp.client3" %% "core" % "3.9.2", "com.softwaremill.sttp.client3" %% "json4s" % "3.9.2" exclude("org.json4s", "json4s-core_2.12"), //Exclusion because json4s is used in spark diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/loader/ExcelLoader.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/loader/ExcelLoader.scala index bd703149..5d3946ee 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/loader/ExcelLoader.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/loader/ExcelLoader.scala @@ -1,6 +1,6 @@ package bio.ferlab.datalake.spark3.loader -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import java.time.LocalDate @@ -20,6 +20,23 @@ object ExcelLoader extends Loader { .load(location) } + private def write(df: DataFrame, + location: String, + format: String, + options: Map[String, String], + mode: SaveMode): DataFrame = { + // Excel format requires the schema to be non-empty, does not support empty schema dataframe writes + require(df.schema.nonEmpty, "DataFrame must have a valid schema with at least one column.") + require(options.isDefinedAt("header"), "Expecting [header] to be defined in readOptions.") + + df.write + .options(options) + .format(format) + .mode(mode) + .save(location) + df + } + override def overwritePartition(location: String, databaseName: String, tableName: String, @@ -34,7 +51,9 @@ object ExcelLoader extends Loader { df: DataFrame, partitioning: List[String], format: String, - options: Map[String, String])(implicit spark: SparkSession): DataFrame = ??? + options: Map[String, String])(implicit spark: SparkSession): DataFrame = { + write(df, location, format, options, SaveMode.Overwrite) + } override def insert(location: String, databaseName: String, @@ -42,7 +61,9 @@ object ExcelLoader extends Loader { updates: DataFrame, partitioning: List[String], format: String, - options: Map[String, String])(implicit spark: SparkSession): DataFrame = ??? + options: Map[String, String])(implicit spark: SparkSession): DataFrame = { + write(updates, location, format, options, SaveMode.Append) + } override def upsert(location: String, databaseName: String, @@ -52,7 +73,7 @@ object ExcelLoader extends Loader { partitioning: List[String], format: String, options: Map[String, String])(implicit spark: SparkSession): DataFrame = ??? - + override def scd1(location: String, databaseName: String, tableName: String, diff --git a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/loader/LoadResolver.scala b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/loader/LoadResolver.scala index 9096db5d..62a5c977 100644 --- a/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/loader/LoadResolver.scala +++ b/datalake-spark3/src/main/scala/bio/ferlab/datalake/spark3/loader/LoadResolver.scala @@ -50,6 +50,12 @@ object LoadResolver { case (ELASTICSEARCH, OverWrite) => (ds: DatasetConf, df: DataFrame) => ElasticsearchLoader.writeOnce(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions) + case (EXCEL, OverWrite) => (ds: DatasetConf, df: DataFrame) => + ExcelLoader.writeOnce(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions) + + case (EXCEL, Insert) => (ds: DatasetConf, df: DataFrame) => + ExcelLoader.insert(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions) + //generic fallback behaviours case (f, OverWrite) => (ds: DatasetConf, df: DataFrame) => diff --git a/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/loader/ExcelLoaderSpec.scala b/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/loader/ExcelLoaderSpec.scala index 886a0740..4ce372ee 100644 --- a/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/loader/ExcelLoaderSpec.scala +++ b/datalake-spark3/src/test/scala/bio/ferlab/datalake/spark3/loader/ExcelLoaderSpec.scala @@ -3,16 +3,34 @@ package bio.ferlab.datalake.spark3.loader import bio.ferlab.datalake.commons.config.Format.EXCEL import bio.ferlab.datalake.spark3.testutils.AirportInput import bio.ferlab.datalake.testutils.SparkSpec +import org.apache.spark.sql.DataFrame class ExcelLoaderSpec extends SparkSpec { import spark.implicits._ val folderPath: String = getClass.getClassLoader.getResource("raw/landing/").getPath + val outputLocation: String = "output/airports.xlsx" val expected: Seq[AirportInput] = Seq( AirportInput("1", "YYC", "Calgary Int airport"), AirportInput("2", "YUL", "Montreal Int airport") ) + val expectedUpdate: Seq[AirportInput] = Seq( + AirportInput("3", "YVR", "Vancouver Int airport") + ) + + val initialDF: DataFrame = expected.toDF() + + private def withInitialDfInFolder(rootPath: String)(testCode: String => Any): Unit = { + val dfPath: String = rootPath + outputLocation + ExcelLoader.writeOnce(dfPath, "", "", initialDF, Nil, EXCEL.sparkFormat, Map("header" -> "true")) + testCode(dfPath) + } + + private def withUpdatedDfInFolder(updates: DataFrame, path: String)(testCode: => Any): Unit = { + ExcelLoader.insert(path, "", "", updates, Nil, EXCEL.sparkFormat, Map("header" -> "true")) + testCode + } "read" should "read xlsx file as a DataFrame" in { val fileLocation = folderPath + "airports.xlsx" @@ -34,5 +52,61 @@ class ExcelLoaderSpec extends SparkSpec { .collect() should contain theSameElementsAs expected } + it should "throw an exception when the header option is missing" in { + val fileLocation: String = folderPath + "airports.xlsx" + + an[IllegalArgumentException] should be thrownBy { + ExcelLoader.read(fileLocation, EXCEL.sparkFormat, readOptions = Map.empty, databaseName = None, tableName = None) + } + } + + it should "read folder containing multiple Excel files as a DataFrame" in withOutputFolder("root") { root => + withInitialDfInFolder(root) { folderLocation => + withUpdatedDfInFolder(expectedUpdate.toDF(), folderLocation) { + + val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None) + + result + .as[AirportInput] + .collect() should contain theSameElementsAs (expected ++ expectedUpdate) + } + } + } + + "writeOnce" should "write a dataframe to a file" in withOutputFolder("root") { root => + withInitialDfInFolder(root) { folderLocation => + + val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true")) + + result.as[AirportInput].collect() should contain theSameElementsAs expected + + } + } + + it should "overwrite existing files when writing to the same folder" in withOutputFolder("root") { root => + withInitialDfInFolder(root) { folderLocation => + + //Overwriting the same location + ExcelLoader.writeOnce(folderLocation, "", "", expectedUpdate.toDF(), Nil, EXCEL.sparkFormat, Map("header" -> "true")) + + val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None) + + result + .as[AirportInput] + .collect() should contain theSameElementsAs expectedUpdate + } + } + + "insert" should "append a dataframe to an existing file" in withOutputFolder("root") { root => + withInitialDfInFolder(root) { folderLocation => + withUpdatedDfInFolder(expectedUpdate.toDF(), folderLocation) { + val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None) + + result + .as[AirportInput] + .collect() should contain theSameElementsAs (expected ++ expectedUpdate) + } + } + } }