Spark笔记之使用UDF(User Define Function)
Spark笔记之使用UDF(User Define Function)目录1、UDF介绍2、使用UDF2.1 在SQL语句中使用UDF2.2 直接对列应用UDF(脱离sql)3、完整代码1、UDF介绍UDF(User Define Function),即用户自定义函数,Spark的官方文档中没有对UDF做过多介绍,猜想可能是认为比较简单吧。几乎所有sql数据库的实现都为用户提供了扩展接口来增强sq
Spark笔记之使用UDF(User Define Function)
目录
1、UDF介绍
UDF(User Define Function),即用户自定义函数,Spark的官方文档中没有对UDF做过多介绍,猜想可能是认为比较简单吧。
几乎所有sql数据库的实现都为用户提供了扩展接口来增强sql语句的处理能力,这些扩展称之为UDXXX,即用户定义(User Define)的XXX,这个XXX可以是对单行操作的UDF,或者是对多行操作的UDAF,或者是UDTF,本次主要介绍UDF。
UDF的UD表示用户定义,既然有用户定义,就会有系统内建(built-in),一些系统内建的函数比如abs,接受一个数字返回它的绝对值,比如substr对字符串进行截取,它们的特点就是在执行sql语句的时候对每行记录调用一次,每调用一次传入一些参数,这些参数通常是表的某一列或者某几列在当前行的值,然后产生一个输出作为结果。
适用场景:UDF使用频率极高,对于单条记录进行比较复杂的操作,使用内置函数无法完成或者比较复杂的情况都比较适合使用UDF。
2、使用UDF
2.1 在SQL语句中使用UDF
在sql语句中使用UDF指的是在spark.sql("select udf_foo(…)")这种方式使用UDF,套路大致有以下几步:
- 实现UDF,可以是case class,可以是匿名类
- 注册到spark,将类绑定到一个name,后续会使用这个name来调用函数
- 在sql语句中调用注册的name调用UDF
下面是一个简单的示例:
Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession.builder().master("local").appName("AppName").getOrCreate()
import spark.implicits._
// 注册可以在sql语句中使用的UDF
spark.udf.register("to_uppercase", (s: String) => s.toUpperCase())
// 创建一张表
Seq((1, "foo"), (2, "bar")).toDF("id", "text").createOrReplaceTempView("t_foo")
spark.sql("select id, to_uppercase(text) from t_foo").show()
spark.sql("select id, to_uppercase(text) new_text from t_foo").show()
+---+----------------------+
| id|UDF:to_uppercase(text)|
+---+----------------------+
| 1| FOO|
| 2| BAR|
+---+----------------------+
+---+--------+
| id|new_text|
+---+--------+
| 1| FOO|
| 2| BAR|
+---+--------+
2.2 直接对列应用UDF(脱离sql)
在sql语句中使用比较麻烦,还要进行注册什么的,可以定义一个UDF然后将它应用到某个列上:
val ds = Seq((1, "foo"), (2, "bar")).toDF("id", "text")
val toUpperCase = udf((s: String) => s.toUpperCase)
ds.withColumn("text", toUpperCase('text)).show()
+---+----+
| id|text|
+---+----+
| 1| FOO|
| 2| BAR|
+---+----+
传入两个参数:
val rowKeyGenerator1 = udf((n: String,n1 :Int) =>{
val r = scala.util.Random
val randomNB = r.nextInt( (100) ).toString()
val deviceNew = randomNB.concat(n).concat(n1.toString)
deviceNew
}, StringType)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession.builder().master("local").appName("AppName").getOrCreate()
import spark.implicits._
val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
//创建测试df
val df = spark.createDataFrame(userData).toDF("name", "age")
df.show()
val ds2=df.withColumn("NameNewValue",rowKeyGenerator1(col("name"),col("age")))
ds2.show()
}
+-----+---+
| name|age|
+-----+---+
| Leo| 16|
|Marry| 21|
| Jack| 14|
| Tom| 18|
+-----+---+
+-----+---+------------+
| name|age|NameNewValue|
+-----+---+------------+
| Leo| 16| 69Leo16|
|Marry| 21| 91Marry21|
| Jack| 14| 61Jack14|
| Tom| 18| 62Tom18|
+-----+---+------------+
需要注意的是受Scala limit 22限制,自定义UDF最多接受22个参数,不过正常情况下完全够用了。
https://www.cnblogs.com/cc11001100/p/9463909.html
2.3 scala-处理Spark UDF中的所有列/整行
import org.apache.spark.sql.Row
//定义一个函数,使一行中的所有元素都成为一个字符串,用(如您有computehash函数)分隔。
def concatFunc(row: Row) = row.mkString(", ")
import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))
//我使用withcolumn函数和struct内置函数调用udf函数,将所选列组合为一列并传递给udf函数
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession.builder().master("local").appName("AppName").getOrCreate()
import spark.implicits._
/**scala-处理Spark UDF中的所有列/整行
* https://www.runexception.com/q/821
*
* 对于包含字符串和数字数据类型组合的 DataFrame ,目标是创建一个新的features列,该列是所有这些数据类型的minhash。
* 虽然这可以通过执行dataframe.to rdd来完成,但是在下一步将简单地将RDD transformation回 DataFrame 时,这样做是很 代价很大。
*
* 您可以使用row使用struct inbuilt函数将所有列或选定列传递给一个udf函数。
* */
val df11 = spark.createDataFrame(Seq(("a", "b", "c"),("a1", "b1", "c1"))).toDF("col1", "col2", "col3")
df11.show()
df11.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3"))))
.show(false)
//所以可以看到,该行可用于将整行作为参数传递
//甚至可以一次传递一行中的所有列
val columns = df11.columns
columns.foreach(println)
df11.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
//对于SQL查询也可以实现相同的效果,只需将UDF函数注册为
df11.createOrReplaceTempView("tempview")
spark.udf.register("combineUdf", combineUdf)
spark.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview").show(100)
//现在,如果您不想硬编码列的名称,那么您可以 root据需要选择列名称,并将其设置为字符串
val columns1 = df11.columns.map(x => "`"+x+"`").mkString(",")
println(columns1)
spark.sql(s"select *, combineUdf(struct(${columns1})) as concatenated from tempview").show(100)
}
输出:
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| b| c|
| a1| b1| c1|
+----+----+----+
+----+----+----+-------------+
|col1|col2|col3|contcatenated|
+----+----+----+-------------+
|a |b |c |a, b, c |
|a1 |b1 |c1 |a1, b1, c1 |
+----+----+----+-------------+
col1
col2
col3
+----+----+----+------------+
|col1|col2|col3|concatenated|
+----+----+----+------------+
| a| b| c| a, b, c|
| a1| b1| c1| a1, b1, c1|
+----+----+----+------------+
`col1`,`col2`,`col3`
+----+----+----+------------+
|col1|col2|col3|concatenated|
+----+----+----+------------+
| a| b| c| a, b, c|
| a1| b1| c1| a1, b1, c1|
+----+----+----+------------+
Process finished with exit code 0
3、完整代码
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
/**
* @class TestUDF
* @author yyz
* @date 2021/01/25 13:31
* Spark笔记之使用UDF(User Define Function)
* 2.1 在SQL语句中使用UDF
* 2.2 直接对列应用UDF(脱离sql)
* 2.3 scala-处理Spark UDF中的所有列/整行
*
* https://dzone.com/articles/how-to-use-udf-in-spark-without-register-them
* How to Use UDF in Spark Without Register Them
* This article will show a code snippet on how to use UDF in Spark without registering them.
* Here, we will demonstrate the use of UDF via a small example.
* Use Case: We need to change the value of an existing column of DF/DS to add some prefix or suffix to the existing value in a new column.
* // code snippet how to create UDF in Spark without registering them
*
* Note: We can also change the type from String to any other supported type, as per individual requirements.
* Make sure while developing that we handle null cases, as this is a common cause of errors.
* UDFs are a black box for the Spark engine, whereas functions that take a Column argument and return a Column are not a black box for Spark.
* It is always recommended to use Spark's Native API/Expression over UDF's with contrast to performance parameters.
*
* */
object TestUDF {
val rowKeyGenerator1 = udf((n: String,n1 :Int) =>{
val r = scala.util.Random
val randomNB = r.nextInt( (100) ).toString()
val deviceNew = randomNB.concat(n).concat(n1.toString)
deviceNew
}, StringType)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession.builder().master("local").appName("AppName").getOrCreate()
import spark.implicits._
// 注册可以在sql语句中使用的UDF
spark.udf.register("to_uppercase", (s: String) => s.toUpperCase())
// 创建一张表
Seq((1, "foo"), (2, "bar")).toDF("id", "text").createOrReplaceTempView("t_foo")
spark.sql("select id, to_uppercase(text) from t_foo").show()
spark.sql("select id, to_uppercase(text) new_text from t_foo").show()
val ds = Seq((1, "foo"), (2, "bar")).toDF("id", "text")
val toUpperCase = udf((s: String) => s.toUpperCase)
ds.withColumn("text", toUpperCase('text)).show()
val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
val df = spark.createDataFrame(userData).toDF("name", "age")
df.show()
val ds2=df.withColumn("NameNewValue",rowKeyGenerator1(col("name"),col("age")))
ds2.show()
/**scala-处理Spark UDF中的所有列/整行
* https://www.runexception.com/q/821
*
* 对于包含字符串和数字数据类型组合的 DataFrame ,目标是创建一个新的features列,该列是所有这些数据类型的minhash。
* 虽然这可以通过执行dataframe.to rdd来完成,但是在下一步将简单地将RDD transformation回 DataFrame 时,这样做是很 代价很大。
*
* 您可以使用row使用struct inbuilt函数将所有列或选定列传递给一个udf函数。
* */
val df11 = spark.createDataFrame(Seq(("a", "b", "c"),("a1", "b1", "c1"))).toDF("col1", "col2", "col3")
df11.show()
df11.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3"))))
.show(false)
//所以可以看到,该行可用于将整行作为参数传递
//甚至可以一次传递一行中的所有列
val columns = df11.columns
columns.foreach(println)
df11.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
//对于SQL查询也可以实现相同的效果,只需将UDF函数注册为
df11.createOrReplaceTempView("tempview")
spark.udf.register("combineUdf", combineUdf)
spark.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview").show(100)
//现在,如果您不想硬编码列的名称,那么您可以 root据需要选择列名称,并将其设置为字符串
val columns1 = df11.columns.map(x => "`"+x+"`").mkString(",")
println(columns1)
spark.sql(s"select *, combineUdf(struct(${columns1})) as concatenated from tempview").show(100)
spark.close()
}
import org.apache.spark.sql.Row
//定义一个函数,使一行中的所有元素都成为一个字符串,用(如您有computehash函数)分隔。
def concatFunc(row: Row) = row.mkString(", ")
import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))
//我使用withcolumn函数和struct内置函数调用udf函数,将所选列组合为一列并传递给udf函数
}
更多推荐
所有评论(0)