(头歌)MapReduce的编程开发-求和(统计手机用户总流量)
求和是 MapReduce 中最常见的数值算法,使用 Map 端读取数据,若只需要针对单行数据进行求和的话,只 Map 端就可以满足了。若需要针对多行数据进行分组求和的话,那就需要 Map 端和 Reducer 端相结合,以 key 值区分来将所有数值进行求和,达到分组求和的效果。
·
没人发我来发!
任务1 统计手机用户总流量
任务描述
本关任务:根据手机流量数据,编写 MapReduce 程序来统计出每个手机号码的一年总流量。
相关知识
求和概述
求和是 MapReduce 中最常见的数值算法,使用 Map 端读取数据,若只需要针对单行数据进行求和的话,只 Map 端就可以满足了。若需要针对多行数据进行分组求和的话,那就需要 Map 端和 Reducer 端相结合,以 key 值区分来将所有数值进行求和,达到分组求和的效果。
数据文件格式说明
这是编程中用到的手机流量数据,为 txt 格式,文件名phonetraffic.txt
,大小 240 行,前几行示例如下:
18632845069,Jan,40978,94715
18632845069,Feb,39481,63612
18632845069,Mar,88509,13659
18632845069,Apr,68661,87788
18632845069,May,11444,22327
18632845069,Jun,88736,150
18632845069,Jul,34271,20820
18632845069,Aug,91564,21082
18632845069,Sept,73077,21820
18632845069,Oct,66969,28916
18632845069,Nov,12245,77496
18632845069,Dec,19398,98038
...
-- 总共 240 行--
- 每一行数据(4列)分别表示: 手机号码, 月份, 上行流量(上传), 下行流量(下载);
本关卡使用一个计算手机总流量作为案例,可以先将上行流量和下行流量加起来作为当月总流量,在通过手机号去将每月的流量累加起来,就可以得出手机号码的一年总流量为多少。
编程要求
根据提示,在右侧编辑器补充代码,计算出每个手机号码的一年总流量。
- main 方法已给出,其中 Job 和输入输出路径已配置完成,无需更改;
- map 和 reduce 的输入输出 key、value 已给出;
- 编程中直接写 map 与 reduce 过程的主要内容即可。
预期输出格式:
手机号码 总流量
手机号码 总流量
···
···
测试说明
平台会对你编写的代码进行测试,如果编写的 MapReduce 输出与预期一致,则通过。
代码:
package phone.mapreduce;
//2023/12/8 Hadoop实验四
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PhonetrafficDriver {
public static class Map extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/********** Begin **********/
//获取输入的行,并以 , 分开成列表
String[] fields = value.toString().split(",");
//将当月的上行流量和下行流量相加
int sum = Integer.parseInt(fields[2]) + Integer.parseInt(fields[3]);
//将key为手机号,value为当月的总流量传入至reduce中
context.write(new Text(fields[0]), new IntWritable(sum));
/********** End **********/
}
}
public static class Reduce extends Reducer<Text, IntWritable,Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
/********** Begin **********/
//定义总流量
int sum = 0;
//遍历集合求手机一年总流量
for (IntWritable value : values) {
sum += value.get();
}
//得出结果
context.write(key, new IntWritable(sum));
/********** End **********/
}
}
public static void main(String[] args) throws Exception {
//创建配置信息
Configuration conf=new Configuration();
// 创建任务
Job job = Job.getInstance(conf);
//设置执行类
job.setJarByClass(PhonetrafficDriver.class);
//设置自定义Mapper类
job.setMapperClass(Map.class);
//设置自定义Reducer类
job.setReducerClass(Reduce.class);
//设置map函数输出数据的key和value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce函数输出数据的key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//如果输出目录存在,就删除
Path output= new Path("/root/files");
FileSystem fileSystem = output.getFileSystem(conf);
if (fileSystem.exists(output)){
fileSystem.delete(output,true);
}
//设置输入输出路径
FileInputFormat.addInputPath(job, new Path("/data/workspace/myshixun/data/phonetraffic.txt"));
FileOutputFormat.setOutputPath(job,output);
//提交作业,若成功返回true,失败返回falase
boolean b = job.waitForCompletion(true);
if (b){
System.out.println("恭喜,清洗成功");
}else{
System.out.println("不好意思,清洗失败");
}
}
}
更多推荐
所有评论(0)