Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ lazy val `datalake-spark3` = (project in file("datalake-spark3"))
"com.microsoft.sqlserver" % "mssql-jdbc" % "8.4.1.jre11" % Provided,
"com.microsoft.aad" % "adal4j" % "0.0.2" % Provided,
"com.microsoft.azure" % "spark-mssql-connector_2.12" % "1.2.0" % Provided,
"com.crealytics" %% "spark-excel" % "3.5.0_0.20.3" % Provided,
"dev.mauch" %% "spark-excel" % "3.5.5_0.30.2" % Provided,
//Use by ElasticsearchClient
"com.softwaremill.sttp.client3" %% "core" % "3.9.2",
"com.softwaremill.sttp.client3" %% "json4s" % "3.9.2" exclude("org.json4s", "json4s-core_2.12"), //Exclusion because json4s is used in spark
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package bio.ferlab.datalake.spark3.loader

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import java.time.LocalDate

Expand All @@ -20,6 +20,26 @@ object ExcelLoader extends Loader {
.load(location)
}

def write(df: DataFrame,
location: String,
databaseName: String,
tableName: String,
partitioning: List[String],
format: String,
options: Map[String, String],
mode: SaveMode): DataFrame = {
// Excel format requires the schema to be non-empty, does not support empty schema dataframe writes
require(df.schema.nonEmpty, "DataFrame must have a valid schema with at least one column.")
require(options.isDefinedAt("header"), "Expecting [header] to be defined in readOptions.")

df.write
.options(options)
.format(format)
.mode(mode)
.save(location)
df
}

override def overwritePartition(location: String,
databaseName: String,
tableName: String,
Expand All @@ -34,15 +54,19 @@ object ExcelLoader extends Loader {
df: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
options: Map[String, String])(implicit spark: SparkSession): DataFrame = {
write(df, location, databaseName, tableName, partitioning, format, options, SaveMode.Overwrite)
}

override def insert(location: String,
databaseName: String,
tableName: String,
updates: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
options: Map[String, String])(implicit spark: SparkSession): DataFrame = {
write(updates, location, databaseName, tableName, partitioning, format, options, SaveMode.Append)
}

override def upsert(location: String,
databaseName: String,
Expand All @@ -52,7 +76,7 @@ object ExcelLoader extends Loader {
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???

override def scd1(location: String,
databaseName: String,
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ object LoadResolver {
case (ELASTICSEARCH, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
ElasticsearchLoader.writeOnce(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)

case (EXCEL, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
ExcelLoader.writeOnce(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)

case (EXCEL, Insert) => (ds: DatasetConf, df: DataFrame) =>
ExcelLoader.insert(ds.location, ds.table.map(_.database).getOrElse(""), ds.table.map(_.name).getOrElse(ds.location), df, ds.partitionby, ds.format.sparkFormat, ds.writeoptions)


//generic fallback behaviours
case (f, OverWrite) => (ds: DatasetConf, df: DataFrame) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,55 @@ package bio.ferlab.datalake.spark3.loader
import bio.ferlab.datalake.commons.config.Format.EXCEL
import bio.ferlab.datalake.spark3.testutils.AirportInput
import bio.ferlab.datalake.testutils.SparkSpec
import org.apache.spark.sql.DataFrame
import org.scalatest.BeforeAndAfterEach

class ExcelLoaderSpec extends SparkSpec {
import java.nio.file.{Files, Paths}

class ExcelLoaderSpec extends SparkSpec with BeforeAndAfterEach {

import spark.implicits._

val folderPath: String = getClass.getClassLoader.getResource("raw/landing/").getPath
val outputLocation: String = folderPath + "output/airports.xlsx"
val expected: Seq[AirportInput] = Seq(
AirportInput("1", "YYC", "Calgary Int airport"),
AirportInput("2", "YUL", "Montreal Int airport")
)
val simpleExpectedUpdate: Seq[AirportInput] = Seq(
AirportInput("3", "YVR", "Vancouver Int airport")
)

val initialDF: DataFrame = expected.toDF()

override def afterEach(): Unit = {
super.afterEach()
val outputPath = Paths.get(outputLocation)
if (Files.exists(outputPath)) {
cleanUpFilesRecursively(outputPath)
}
}

/**
* Recursively deletes files and directories at the given path. Necessary because spark-excel format API v2
* may create multiple excel partitions when writing to a folder.
* */
private def cleanUpFilesRecursively(path: java.nio.file.Path): Unit = {
if (Files.isDirectory(path)) {
Files.list(path).forEach(cleanUpFilesRecursively)
}
Files.deleteIfExists(path)
}

private def withInitialDfInFolder(testCode: => Any): Unit = {
ExcelLoader.writeOnce(outputLocation, "", "", initialDF, Nil, EXCEL.sparkFormat, Map("header" -> "true"))
testCode
}

private def withUpdatedDfInFolder(updates: DataFrame, testCode: String => Any): Unit = {
ExcelLoader.insert(outputLocation, "", "", updates, Nil, EXCEL.sparkFormat, Map("header" -> "true"))
testCode(outputLocation)
}

"read" should "read xlsx file as a DataFrame" in {
val fileLocation = folderPath + "airports.xlsx"
Expand All @@ -34,5 +73,53 @@ class ExcelLoaderSpec extends SparkSpec {
.collect() should contain theSameElementsAs expected
}

it should "throw an exception when the header option is missing" in {
val fileLocation: String = folderPath + "airports.xlsx"

an[IllegalArgumentException] should be thrownBy {
ExcelLoader.read(fileLocation, EXCEL.sparkFormat, Map.empty, None, None)
}
}

it should "read folder containing multiple Excel files as a DataFrame" in withInitialDfInFolder {
withUpdatedDfInFolder(simpleExpectedUpdate.toDF(), { folderLocation =>

val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs (expected ++ simpleExpectedUpdate)
})
}

"writeOnce" should "write a dataframe to a file" in {
ExcelLoader.writeOnce(outputLocation, "", "", initialDF, Nil, EXCEL.sparkFormat, Map("header" -> "true"))

val result = ExcelLoader.read(outputLocation, EXCEL.sparkFormat, Map("header" -> "true"))

result.as[AirportInput].collect() should contain theSameElementsAs expected
}

it should "overwrite existing files when writing to the same folder" in withInitialDfInFolder {
//Overwriting the same location
ExcelLoader.writeOnce(outputLocation, "", "", simpleExpectedUpdate.toDF(), Nil, EXCEL.sparkFormat, Map("header" -> "true"))

val result = ExcelLoader.read(outputLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs simpleExpectedUpdate
}

"insert" should "append a dataframe to an existing file" in withInitialDfInFolder {
withUpdatedDfInFolder(simpleExpectedUpdate.toDF(), {
folderLocation =>
val result = ExcelLoader.read(folderLocation, EXCEL.sparkFormat, Map("header" -> "true"), None, None)

result
.as[AirportInput]
.collect() should contain theSameElementsAs (expected ++ simpleExpectedUpdate)
})
}

}