Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ lazy val `datalake-spark3` = (project in file("datalake-spark3"))
"com.microsoft.sqlserver" % "mssql-jdbc" % "8.4.1.jre11" % Provided,
"com.microsoft.aad" % "adal4j" % "0.0.2" % Provided,
"com.microsoft.azure" % "spark-mssql-connector_2.12" % "1.2.0" % Provided,
"com.crealytics" %% "spark-excel" % "3.5.0_0.20.3" % Provided,
"dev.mauch" %% "spark-excel" % "3.5.5_0.30.2" % Provided,
//Use by ElasticsearchClient
"com.softwaremill.sttp.client3" %% "core" % "3.9.2",
"com.softwaremill.sttp.client3" %% "json4s" % "3.9.2" exclude("org.json4s", "json4s-core_2.12"), //Exclusion because json4s is used in spark
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package bio.ferlab.datalake.spark3.loader

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import java.time.LocalDate

Expand All @@ -20,6 +20,23 @@ object ExcelLoader extends Loader {
.load(location)
}

private def write(df: DataFrame,
location: String,
format: String,
options: Map[String, String],
mode: SaveMode): DataFrame = {
// Excel format requires the schema to be non-empty, does not support empty schema dataframe writes
require(df.schema.nonEmpty, "DataFrame must have a valid schema with at least one column.")
require(options.isDefinedAt("header"), "Expecting [header] to be defined in readOptions.")

df.write
.options(options)
.format(format)
.mode(mode)
.save(location)
df
}

override def overwritePartition(location: String,
databaseName: String,
tableName: String,
Expand All @@ -34,15 +51,19 @@ object ExcelLoader extends Loader {
df: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
options: Map[String, String])(implicit spark: SparkSession): DataFrame = {
write(df, location, format, options, SaveMode.Overwrite)
}

override def insert(location: String,
databaseName: String,
tableName: String,
updates: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
options: Map[String, String])(implicit spark: SparkSession): DataFrame = {
write(updates, location, format, options, SaveMode.Append)
}

override def upsert(location: String,
databaseName: String,
Expand All @@ -52,7 +73,7 @@ object ExcelLoader extends Loader {
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???

override def scd1(location: String,
databaseName: String,
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ object LoadResolver {
case (ELASTICSEARCH, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
ElasticsearchLoader.writeOnce(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)

case (EXCEL, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
ExcelLoader.writeOnce(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)

case (EXCEL, Insert) => (ds: DatasetConf, df: DataFrame) =>
ExcelLoader.insert(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)


//generic fallback behaviours
case (f, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,34 @@ package bio.ferlab.datalake.spark3.loader
import bio.ferlab.datalake.commons.config.Format.EXCEL
import bio.ferlab.datalake.spark3.testutils.AirportInput
import bio.ferlab.datalake.testutils.SparkSpec
import org.apache.spark.sql.DataFrame

class ExcelLoaderSpec extends SparkSpec {

import spark.implicits._

val folderPath: String = getClass.getClassLoader.getResource("raw/landing/").getPath
val outputLocation: String = "output/airports.xlsx"
val expected: Seq[AirportInput] = Seq(
AirportInput("1", "YYC", "Calgary Int airport"),
AirportInput("2", "YUL", "Montreal Int airport")
)
val expectedUpdate: Seq[AirportInput] = Seq(
AirportInput("3", "YVR", "Vancouver Int airport")
)

val initialDF: DataFrame = expected.toDF()

private def withInitialDfInFolder(rootPath: String)(testCode: String => Any): Unit = {
val dfPath: String = rootPath + outputLocation
ExcelLoader.writeOnce(dfPath, "", "", initialDF, Nil, EXCEL.sparkFormat, Map("header" -> "true"))
testCode(dfPath)
}

private def withUpdatedDfInFolder(updates: DataFrame, path: String)(testCode: => Any): Unit = {
ExcelLoader.insert(path, "", "", updates, Nil, EXCEL.sparkFormat, Map("header" -> "true"))
testCode
}

"read" should "read xlsx file as a DataFrame" in {
val fileLocation = folderPath + "airports.xlsx"
Expand All @@ -34,5 +52,61 @@ class ExcelLoaderSpec extends SparkSpec {
.collect() should contain theSameElementsAs expected
}

it should "throw an exception when the header option is missing" in {
val fileLocation: String = folderPath + "airports.xlsx"

an[IllegalArgumentException] should be thrownBy {
ExcelLoader.read(fileLocation, EXCEL.sparkFormat, readOptions = Map.empty, databaseName = None, tableName = None)
}
}

it should "read folder containing multiple Excel files as a DataFrame" in withOutputFolder("root") { root =>
withInitialDfInFolder(root) { folderLocation =>
withUpdatedDfInFolder(expectedUpdate.toDF(), folderLocation) {

val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs (expected ++ expectedUpdate)
}
}
}

"writeOnce" should "write a dataframe to a file" in withOutputFolder("root") { root =>
withInitialDfInFolder(root) { folderLocation =>

val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"))

result.as[AirportInput].collect() should contain theSameElementsAs expected

}
}

it should "overwrite existing files when writing to the same folder" in withOutputFolder("root") { root =>
withInitialDfInFolder(root) { folderLocation =>

//Overwriting the same location
ExcelLoader.writeOnce(folderLocation, "", "", expectedUpdate.toDF(), Nil, EXCEL.sparkFormat, Map("header" -> "true"))

val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs expectedUpdate
}
}

"insert" should "append a dataframe to an existing file" in withOutputFolder("root") { root =>
withInitialDfInFolder(root) { folderLocation =>
withUpdatedDfInFolder(expectedUpdate.toDF(), folderLocation) {
val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs (expected ++ expectedUpdate)
}
}
}

}