第1关:UDF——“一进一出”

知识点

1.UDF的实现方法:简单的接口UDF、较为复杂的GenericUDF。

2.简单 UDF 只需要重写evalute()方法。

编程要求

编写自定义函数,通过 Maven打包导入Hive中,实现以下要求:

  • 将新建好的表comment中字段com_province进行处理:如有字段带有“省”的将“省”字去掉,没有带“省”的字段后面加上“市”。
  • 将自定义函数查询出来的数据保存下来存入新表newdata中。
#进入工作目录。
cd /data/workspace/myshixun/step1
#mvn打包
mvn clean package
#打开hive,输入建表命令
create table comment( com_no string,com_food string,com_province string,com_price string,com_content string) row format delimited fields terminated by "," stored as textfile;
#导入数据
load data local inpath "/data/workspace/myshixun/step1/data.txt" into table comment;
#导入jar包
add jar /data/workspace/myshixun/step1/target/step1-1.0-SNAPSHOT.jar;
#创建临时自定义函数
create temporary function procost as 'myudf.AvgCost';
#新建 Hive 表newdata
create table newdata(com_no string,com_food string,new_province string,com_price string,com_content string) row format delimited fields terminated by "," stored as textfile;
#导入利用自定函数查询出来的数据至表`newdata`
insert overwrite table newdata select com_no,com_food,procost(com_province),com_price,com_content from comment;
package myudf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class AvgCost extends UDF {
    public String evaluate(String raw){
        /************** Begin **************/
			//判断字段是否为为空
            if(!"".equals(raw)&&raw!=null){
                //判断字段是否包含“省”,有的去掉“省”,没有的加上“市”
                if(raw.contains("省")){
                    raw=raw.substring(0,raw.length()-1);
                }else{
                    raw=raw+"市";
                }
                return raw;
            }
            //返回处理好的字段
         /************** End **************/
         return null;
    }

}

第2关:UDFA——“多进一出”

知识点

1.UDFA(用户定义的聚合函数)需要UDAF类,计算类Evaluator实现UDAFEvaluator接口,还必须实现以下接口:
(1)init:类似于构造函数,用于UDAF的初始化,主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。

(2)Iterate:每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。

(3)terminatePartial:Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。

(4)merge:Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。

(5)terminate:Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

编程要求

编写自定义函数,通过 Maven打包导入Hive中,实现以下要求:

  • 将新建好的表studentscore利用UDAF查询出每个课程的最好成绩。
  • 将自定义函数查询出来的数据保存下来存入新表newdata2中。
#进入工作目录。
cd /data/workspace/myshixun/step2
#mvn打包(打包过程可能会需要些时间,请耐心等候)。
mvn clean package
#打开hive,输入建表命令
create table studentscore(stu_no int,stu_name string,course_name string,scores int) row format delimited fields terminated by "," stored as textfile;
#导入数据
load data local inpath "/data/workspace/myshixun/step2/data.txt" into table studentscore;
#新建表newdata2
create table newdata2(course_name string,max_score string) row format delimited fields terminated by "," stored as textfile;
#在Hive中导入jar包
add jar /data/workspace/myshixun/step2/target/step2-1.0-SNAPSHOT.jar;
#创建临时自定义函数
create temporary function findmax as 'myudaf.FindMax';
#导入利用自定函数查询出来的数据至表newdata2
insert overwrite table newdata2 select course_name,findmax(scores) from studentscore group by course_name;
package myudaf;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;

public class FindMax  extends UDAF {
    public static class FindMaxUDAFEvaluator implements UDAFEvaluator {
        private IntWritable result;
        /*
        init函数类似于构造函数,用于UDAF的初始化。
        */
        @Override
        public void init() {
            result = null;
        }
        /*
        iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
         */
        public boolean iterate(IntWritable value) {
       		 /*********** Begin ***********/
       		 //判断是value值否为空
            if(value==null)
                return false;
      		 //判断result是否为空,是则将value的值赋值给result,否则将俩比谁更大,然后赋值给result。
            if(result==null)
                result=new IntWritable(value.get());
            else
                result.set(Math.max(result.get(),value.get()));    
       
           	 /*********** End ***********/
         	   return true;
        }
        /*
          terminatePartial无参数,其为iterate函数遍历结束后,返回轮转数据
         */
        public IntWritable terminatePartial()
        {
            return result;
        }
        /*
        merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
         */
        public boolean merge(IntWritable other)
        {
            return iterate(other);
        }
        //Hive最终聚集结果的时候就会调用该方法。
        public IntWritable terminate()
        {
            return result;
        }

    }
}

第3关:UDTF——“一进多出”

知识点

1.UDTF(用户定义的表生成函数):经常用来分解 Json 数据或者 key:value 数据。

2.UDTF 函数的实现需要继承抽象类GenericUDTF,并且要实现以下方法:

(1)initialize:UDTF最先调用的方法,可以再次设置异常检测,返回要求的参数类型和个数。

(2)process: 初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。

(3)close:对需要清理的方法进行清理。

编程要求

将自定义函数查询出来用户3的数据放入新建的表newuser

#进入工作目录。
cd /data/workspace/myshixun/step3
#mvn打包(打包过程可能会需要些时间,请耐心等候)。
mvn clean package
#Hive建表
create table usertable(user_no int,user_info string) row format delimited fields terminated by "," stored as textfile;
#导入数据
load data local inpath "/data/workspace/myshixun/step3/data.txt" into table usertable;
#在Hive中导入jar包
add jar /data/workspace/myshixun/step3/target/step3-1.0-SNAPSHOT.jar;
#创建临时自定义函数
create temporary function usercost as 'myudtf.CostUDTF';
#创建新表
create table newuser(user_field string,user_info string) row format delimited fields terminated by "," stored as textfile;
#将查询出来的用户3的信息导入到新表内
insert overwrite table newuser select usercost(user_info) from usertable where user_no = 3;
package myudtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;

public class CostUDTF extends GenericUDTF {
    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        // 异常检测
        if (args.length != 1) {
            throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");
        }

        if(args[0].getCategory()!=ObjectInspector.Category.PRIMITIVE&&((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");
        }
		/********** Begin **********/
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("user_field");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("user_info");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);

		/********** End **********/
    }

    @Override
    public void process(Object[] args) throws HiveException {
		/********** Begin **********/
        String input=args[0].toString();
        String[] test=input.split(";");
        for(int i=0;i<test.length;i++)
        {
            try{
                String[] result=test[i].split(":");
                forward(result);
            }catch(Exception e){continue;}
        }
		/********** End **********/
    }

    @Override
    public void close() throws HiveException {

    }
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐