-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathExcelLoader.scala
More file actions
104 lines (91 loc) · 4.03 KB
/
ExcelLoader.scala
File metadata and controls
104 lines (91 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package bio.ferlab.datalake.spark3.loader
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.time.LocalDate
object ExcelLoader extends Loader {
override def read(location: String,
format: String,
readOptions: Map[String, String],
databaseName: Option[String] = None,
tableName: Option[String] = None)(implicit spark: SparkSession): DataFrame = {
require(readOptions.isDefinedAt("header"), "Expecting [header] to be defined in readOptions.")
spark
.read
.format(format)
.options(readOptions)
.load(location)
}
private def write(df: DataFrame,
location: String,
format: String,
options: Map[String, String],
mode: SaveMode): DataFrame = {
// Excel format requires the schema to be non-empty, does not support empty schema dataframe writes
require(df.schema.nonEmpty, "DataFrame must have a valid schema with at least one column.")
require(options.isDefinedAt("header"), "Expecting [header] to be defined in readOptions.")
df.write
.options(options)
.format(format)
.mode(mode)
.save(location)
df
}
override def overwritePartition(location: String,
databaseName: String,
tableName: String,
df: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
override def writeOnce(location: String,
databaseName: String,
tableName: String,
df: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = {
write(df, location, format, options, SaveMode.Overwrite)
}
override def insert(location: String,
databaseName: String,
tableName: String,
updates: DataFrame,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = {
write(updates, location, format, options, SaveMode.Append)
}
override def upsert(location: String,
databaseName: String,
tableName: String,
updates: DataFrame,
primaryKeys: Seq[String],
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
override def scd1(location: String,
databaseName: String,
tableName: String,
updates: DataFrame,
primaryKeys: Seq[String],
oidName: String,
createdOnName: String,
updatedOnName: String,
partitioning: List[String],
format: String,
options: Map[String, String])(implicit spark: SparkSession): DataFrame = ???
override def scd2(location: String,
databaseName: String,
tableName: String,
updates: DataFrame,
primaryKeys: Seq[String],
buidName: String,
oidName: String,
isCurrentName: String,
partitioning: List[String],
format: String,
validFromName: String,
validToName: String,
options: Map[String, String],
minValidFromDate: LocalDate,
maxValidToDate: LocalDate)(implicit spark: SparkSession): DataFrame = ???
}