Hive自定义函数
1.UDF的实现方法:简单的接口UDF、较为复杂的GenericUDF。2.简单 UDF 只需要重写evalute()方法。
第1关:UDF——“一进一出”
知识点
1.UDF的实现方法:简单的接口UDF、较为复杂的GenericUDF。
2.简单 UDF 只需要重写evalute()
方法。
编程要求
编写自定义函数,通过 Maven打包导入Hive中,实现以下要求:
- 将新建好的表
comment
中字段com_province
进行处理:如有字段带有“省”的将“省”字去掉,没有带“省”的字段后面加上“市”。 - 将自定义函数查询出来的数据保存下来存入新表
newdata
中。
#进入工作目录。
cd /data/workspace/myshixun/step1
#mvn打包
mvn clean package
#打开hive,输入建表命令
create table comment( com_no string,com_food string,com_province string,com_price string,com_content string) row format delimited fields terminated by "," stored as textfile;
#导入数据
load data local inpath "/data/workspace/myshixun/step1/data.txt" into table comment;
#导入jar包
add jar /data/workspace/myshixun/step1/target/step1-1.0-SNAPSHOT.jar;
#创建临时自定义函数
create temporary function procost as 'myudf.AvgCost';
#新建 Hive 表newdata
create table newdata(com_no string,com_food string,new_province string,com_price string,com_content string) row format delimited fields terminated by "," stored as textfile;
#导入利用自定函数查询出来的数据至表`newdata`
insert overwrite table newdata select com_no,com_food,procost(com_province),com_price,com_content from comment;
package myudf;
import org.apache.hadoop.hive.ql.exec.UDF;
public class AvgCost extends UDF {
public String evaluate(String raw){
/************** Begin **************/
//判断字段是否为为空
if(!"".equals(raw)&&raw!=null){
//判断字段是否包含“省”,有的去掉“省”,没有的加上“市”
if(raw.contains("省")){
raw=raw.substring(0,raw.length()-1);
}else{
raw=raw+"市";
}
return raw;
}
//返回处理好的字段
/************** End **************/
return null;
}
}
第2关:UDFA——“多进一出”
知识点
1.UDFA(用户定义的聚合函数)需要UDAF类,计算类Evaluator实现UDAFEvaluator接口,还必须实现以下接口:
(1)init:类似于构造函数,用于UDAF的初始化,主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。
(2)Iterate:每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。
(3)terminatePartial:Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
(4)merge:Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
(5)terminate:Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。
编程要求
编写自定义函数,通过 Maven打包导入Hive中,实现以下要求:
- 将新建好的表
studentscore
利用UDAF查询出每个课程的最好成绩。 - 将自定义函数查询出来的数据保存下来存入新表
newdata2
中。
#进入工作目录。
cd /data/workspace/myshixun/step2
#mvn打包(打包过程可能会需要些时间,请耐心等候)。
mvn clean package
#打开hive,输入建表命令
create table studentscore(stu_no int,stu_name string,course_name string,scores int) row format delimited fields terminated by "," stored as textfile;
#导入数据
load data local inpath "/data/workspace/myshixun/step2/data.txt" into table studentscore;
#新建表newdata2
create table newdata2(course_name string,max_score string) row format delimited fields terminated by "," stored as textfile;
#在Hive中导入jar包
add jar /data/workspace/myshixun/step2/target/step2-1.0-SNAPSHOT.jar;
#创建临时自定义函数
create temporary function findmax as 'myudaf.FindMax';
#导入利用自定函数查询出来的数据至表newdata2
insert overwrite table newdata2 select course_name,findmax(scores) from studentscore group by course_name;
package myudaf;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
public class FindMax extends UDAF {
public static class FindMaxUDAFEvaluator implements UDAFEvaluator {
private IntWritable result;
/*
init函数类似于构造函数,用于UDAF的初始化。
*/
@Override
public void init() {
result = null;
}
/*
iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
*/
public boolean iterate(IntWritable value) {
/*********** Begin ***********/
//判断是value值否为空
if(value==null)
return false;
//判断result是否为空,是则将value的值赋值给result,否则将俩比谁更大,然后赋值给result。
if(result==null)
result=new IntWritable(value.get());
else
result.set(Math.max(result.get(),value.get()));
/*********** End ***********/
return true;
}
/*
terminatePartial无参数,其为iterate函数遍历结束后,返回轮转数据
*/
public IntWritable terminatePartial()
{
return result;
}
/*
merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
*/
public boolean merge(IntWritable other)
{
return iterate(other);
}
//Hive最终聚集结果的时候就会调用该方法。
public IntWritable terminate()
{
return result;
}
}
}
第3关:UDTF——“一进多出”
知识点
1.UDTF(用户定义的表生成函数):经常用来分解 Json 数据或者 key:value 数据。
2.UDTF 函数的实现需要继承抽象类GenericUDTF,并且要实现以下方法:
(1)initialize:UDTF最先调用的方法,可以再次设置异常检测,返回要求的参数类型和个数。
(2)process: 初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
(3)close:对需要清理的方法进行清理。
编程要求
将自定义函数查询出来用户3的数据放入新建的表newuser
中
#进入工作目录。
cd /data/workspace/myshixun/step3
#mvn打包(打包过程可能会需要些时间,请耐心等候)。
mvn clean package
#Hive建表
create table usertable(user_no int,user_info string) row format delimited fields terminated by "," stored as textfile;
#导入数据
load data local inpath "/data/workspace/myshixun/step3/data.txt" into table usertable;
#在Hive中导入jar包
add jar /data/workspace/myshixun/step3/target/step3-1.0-SNAPSHOT.jar;
#创建临时自定义函数
create temporary function usercost as 'myudtf.CostUDTF';
#创建新表
create table newuser(user_field string,user_info string) row format delimited fields terminated by "," stored as textfile;
#将查询出来的用户3的信息导入到新表内
insert overwrite table newuser select usercost(user_info) from usertable where user_no = 3;
package myudtf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
public class CostUDTF extends GenericUDTF {
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
// 异常检测
if (args.length != 1) {
throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");
}
if(args[0].getCategory()!=ObjectInspector.Category.PRIMITIVE&&((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");
}
/********** Begin **********/
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("user_field");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("user_info");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
/********** End **********/
}
@Override
public void process(Object[] args) throws HiveException {
/********** Begin **********/
String input=args[0].toString();
String[] test=input.split(";");
for(int i=0;i<test.length;i++)
{
try{
String[] result=test[i].split(":");
forward(result);
}catch(Exception e){continue;}
}
/********** End **********/
}
@Override
public void close() throws HiveException {
}
}
更多推荐
所有评论(0)