Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ lazy val `datalake-spark3` = (project in file("datalake-spark3"))
"com.microsoft.sqlserver" % "mssql-jdbc" % "8.4.1.jre11" % Provided,
"com.microsoft.aad" % "adal4j" % "0.0.2" % Provided,
"com.microsoft.azure" % "spark-mssql-connector_2.12" % "1.2.0" % Provided,
"com.crealytics" %% "spark-excel" % "3.5.0_0.20.3" % Provided,
"dev.mauch" %% "spark-excel" % "3.5.5_0.30.2" % Provided,
//Use by ElasticsearchClient
"com.softwaremill.sttp.client3" %% "core" % "3.9.2",
"com.softwaremill.sttp.client3" %% "json4s" % "3.9.2" exclude("org.json4s", "json4s-core_2.12"), //Exclusion because json4s is used in spark
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package bio.ferlab.datalake.spark3.loader

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import java.time.LocalDate

Expand All @@ -20,6 +20,23 @@ object ExcelLoader extends Loader {
.load(location)
}

private def write(df: DataFrame,
location: String,
format: String,
options: Map[String, String],
mode: SaveMode): DataFrame = {
// Excel format requires the schema to be non-empty, does not support empty schema dataframe writes
require(df.schema.nonEmpty, "DataFrame must have a valid schema with at least one column.")
require(options.isDefinedAt("header"), "Expecting [header] to be defined in readOptions.")

df.write
.options(options)
.format(format)
.mode(mode)
.save(location)
df
}

override def overwritePartition(location: String,
databaseName: String,
tableName: String,
Expand All @@ -34,15 +51,19 @@ object ExcelLoader extends Loader {
df: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
options: Map[String, String])(implicit spark: SparkSession): DataFrame = {
write(df, location, format, options, SaveMode.Overwrite)
}

override def insert(location: String,
databaseName: String,
tableName: String,
updates: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
options: Map[String, String])(implicit spark: SparkSession): DataFrame = {
write(updates, location, format, options, SaveMode.Append)
}

override def upsert(location: String,
databaseName: String,
Expand All @@ -52,7 +73,7 @@ object ExcelLoader extends Loader {
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???

override def scd1(location: String,
databaseName: String,
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ object LoadResolver {
case (ELASTICSEARCH, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
ElasticsearchLoader.writeOnce(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)

case (EXCEL, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
ExcelLoader.writeOnce(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)

case (EXCEL, Insert) => (ds: DatasetConf, df: DataFrame) =>
ExcelLoader.insert(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)


//generic fallback behaviours
case (f, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,37 @@ package bio.ferlab.datalake.spark3.loader
import bio.ferlab.datalake.commons.config.Format.EXCEL
import bio.ferlab.datalake.spark3.testutils.AirportInput
import bio.ferlab.datalake.testutils.SparkSpec
import org.apache.spark.sql.DataFrame
import org.scalatest.BeforeAndAfterEach

import java.nio.file.{Files, Paths}

class ExcelLoaderSpec extends SparkSpec {

import spark.implicits._

val folderPath: String = getClass.getClassLoader.getResource("raw/landing/").getPath
val outputLocation: String = "output/airports.xlsx"
val expected: Seq[AirportInput] = Seq(
AirportInput("1", "YYC", "Calgary Int airport"),
AirportInput("2", "YUL", "Montreal Int airport")
)
val expectedUpdate: Seq[AirportInput] = Seq(
AirportInput("3", "YVR", "Vancouver Int airport")
)

val initialDF: DataFrame = expected.toDF()

private def withInitialDfInFolder(rootPath: String)(testCode: String => Any): Unit = {
val dfPath: String = rootPath + outputLocation
ExcelLoader.writeOnce(dfPath, "", "", initialDF, Nil, EXCEL.sparkFormat, Map("header" -> "true"))
testCode(dfPath)
}

private def withUpdatedDfInFolder(updates: DataFrame, path: String)(testCode: => Any): Unit = {
ExcelLoader.insert(path, "", "", updates, Nil, EXCEL.sparkFormat, Map("header" -> "true"))
testCode
}

"read" should "read xlsx file as a DataFrame" in {
val fileLocation = folderPath + "airports.xlsx"
Expand All @@ -34,5 +55,64 @@ class ExcelLoaderSpec extends SparkSpec {
.collect() should contain theSameElementsAs expected
}

it should "throw an exception when the header option is missing" in {
val fileLocation: String = folderPath + "airports.xlsx"

val options = Map.empty[String, String]
val databaseName, tableName : Option[String] = None

an[IllegalArgumentException] should be thrownBy {
ExcelLoader.read(fileLocation, EXCEL.sparkFormat, options, databaseName, tableName)
}
}

it should "read folder containing multiple Excel files as a DataFrame" in withOutputFolder("root") { root =>
withInitialDfInFolder(root) { folderLocation =>
withUpdatedDfInFolder(expectedUpdate.toDF(), folderLocation) {

val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs (expected ++ expectedUpdate)
}
}
}

"writeOnce" should "write a dataframe to a file" in withOutputFolder("root") { root =>
withInitialDfInFolder(root) { folderLocation =>

val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"))

result.as[AirportInput].collect() should contain theSameElementsAs expected

}
}

it should "overwrite existing files when writing to the same folder" in withOutputFolder("root") { root =>
withInitialDfInFolder(root) { folderLocation =>

//Overwriting the same location
ExcelLoader.writeOnce(folderLocation, "", "", expectedUpdate.toDF(), Nil, EXCEL.sparkFormat, Map("header" -> "true"))

val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs expectedUpdate
}
}

"insert" should "append a dataframe to an existing file" in withOutputFolder("root") { root =>
withInitialDfInFolder(root) { folderLocation =>
withUpdatedDfInFolder(expectedUpdate.toDF(), folderLocation) {
val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs (expected ++ expectedUpdate)
}
}
}

}