-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathUDF.scala
More file actions
55 lines (48 loc) · 1.52 KB
/
Copy pathUDF.scala
File metadata and controls
55 lines (48 loc) · 1.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random
object UDF {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder
.appName(this.getClass().getSimpleName())
.master("local[*]")
.config("", "")
.getOrCreate
import spark.implicits._
import spark.sql
val df_user: DataFrame = spark.read.textFile("data/DSL.txt")
.map(_.split(","))
.map(x => (x(0), x(1), x(2)))
.toDF("id", "name", "age")
.cache()
spark.udf.register("addPrefix", (field: String) => randomPrefixUDF(field))
spark.udf.register("rmPrefix", (field: String) => removePrefixUDF(field))
df_user.createTempView(viewName = "view")
val df_prefix = sql(sqlText = "select addPrefix(name) as pre_name from view")
df_prefix.show()
df_prefix.createTempView(viewName = "pre_view")
sql(sqlText = "select rmPrefix(pre_name) as name from pre_view").show()
spark.stop()
}
/**
* 给DataFrame指定字段随机加"_"前缀
*
* @param field 字段名称
* @return 加完前缀后的值
*/
def randomPrefixUDF(field: String): String = {
val random = new Random()
val prefix = random.nextInt(10)
prefix + "_" + field
}
/**
* 去除DataFrame指定字段随机加的"_"前缀
*
* @param field 字段名称
* @return 去除前缀后的值
*/
def removePrefixUDF(field: String): String = {
field.split("_")(1)
}
}