序列化
就是把内存中的对象
,转换
成字节序列
(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输
。
反序列化
就是将收到字节序列
(或其他数据传输协议)或者是磁盘的持久化数据
,转换
成内存中的对象
。
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化
可以存储“活的”对象
,可以将“活的”对象发送到远程计算机
。
Java 的序列化
是一个重量级序列化框架(Serializable
),一个对象被序列化后
,会附带很多额外的信息
(各种校验信息,Header,继承体系等),不便于在网络中高效传输
。所以,Hadoop
自己开发了一套序列化机制(Writable
)。
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互。
在企业开发中往往常用的基本序列化类型
不能满足所有需求,比如在 Hadoop 框架内部传递一个 bean 对象
,那么该对象就需要实现序列化接口
。
具体实现 bean 对象序列化步骤如下 7 步。
(1)必须实现 Writable 接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造:
public FlowBean() {
super();
}
(3)重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
(4)重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写 toString(),可用"\t"分开,方便后续用。
(7)==如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为
MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。==此部分详看后面的排序案例。
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
1)需求
统计每一个手机号耗费的总上行流量、总下行流量、总流量:
(1)输入data
(2)输入data格式
(3)期望输出data格式
2)需求分析
3)编写MapReduce程序
(1)编写流量统计的Bean对象
package com.root.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 1.定义类 实现writable接口
* 2.重写序列化和反序列化方法
* 3.重写空参构造
* 4.重写tostring方法
*/
public class FlowBean implements Writable {
private long upFlow;//上行流量
private long downFlow;//下行流量
private long sumFlow;//总流量
//重写空参构造
public FlowBean() {
}
//定义set和get方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow+this.downFlow;
}
//重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//重写反序列化方法
@Override
public void readFields(DataInput input) throws IOException {
this.upFlow=input.readLong();
this.downFlow=input.readLong();
this.sumFlow=input.readLong();
}
//重写toString方法,格式化输出
public String toString(){
return upFlow+"\t"+downFlow+"\t"+sumFlow;
}
}
(2)编写Mapper类
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
private Text outK=new Text();
private FlowBean outV=new FlowBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//1.获取一行
//1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
String s = value.toString();
//2.切割
String[] split = s.split("\t");
//3.抓取想要的数据,此处获取上行和下行流量是从每一行最后一列往前数,找到对应的索引
String phone=split[1];
String up=split[split.length-3];
String down=split[split.length-2];
//4.封装
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
//写出
context.write(outK,outV);
}
}
(3)编写Reducer类
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean outV=new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//1.遍历集合 累加
//思考:这里不把totalup和totaldown作为成员变量的原因是?
//因为同一key(手机号)只执行一次reduce方法,
//对同一key我们需要累加其所有的value,如果当作成员变量,
//那么所有的key的value会被加在一起,实现不了对相同手机号的累计计数功能
long totalup=0;
long totaldown=0;
for (FlowBean value : values) {
totalup+=value.getUpFlow();
totaldown+=value.getDownFlow();
}
//2.封装输出的outK,outV
outV.setUpFlow(totalup);
outV.setDownFlow(totaldown);
outV.setSumFlow();
//3.写出
context.write(key,outV);
}
}
(4)编写Driver驱动类
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1.获取job
Configuration conf = new Configuration();
Job ins = Job.getInstance(conf);
//2.设置jar包路径
ins.setJarByClass(FlowDriver.class);
//3.关联mapper和reducer
ins.setMapperClass(FlowMapper.class);
ins.setReducerClass(FlowReducer.class);
//4.设置map输出的kv类型
ins.setMapOutputKeyClass(Text.class);
ins.setMapOutputValueClass(FlowBean.class);
//5.设置最终输出的kv类型
ins.setOutputKeyClass(Text.class);
ins.setOutputValueClass(FlowBean.class);
//6.设置输入路径和输出路径
FileInputFormat.setInputPaths(ins, new Path("D:\\java_learning\\input\\inputflow"));
FileOutputFormat.setOutputPath(ins, new Path("D:\\java_learning\\output\\outputflow"));
//7.提交job
boolean result = ins.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
4)程序调试
事实上,之前的操作我们仅仅是按照Map/Reduce每一阶段的所需要做的工作进行了代码的编写,但实际上我们更关注Map/Reduce
过程的底层实现
,因此我们可以进行Debug
追溯源码:
首先在FlowMapper和FlowReducer类中打好断点
:
之后便可以在FlowDriver类中进行Debug操作:
可以看到开始调试后程序首先
运行到前面所说的Map阶段准备
工作,之后便执行核心业务代码跳转到重写的map方法
中继续执行
进入重写的map方法后,首先要明确map方法会执行多少次?显然这取决于源文件有多少行,因为每一次map只是对一行的数据进行操作,由于我们源文件有22行,所以map方法会执行22次,这可以通过传入的value值
得以体现,下面的debug过程中发现,value
里存储
的的第一列数据
从1逐渐增加至22,之后便跳转到了Map阶段工作结束
的cleanup
代码:
在Map阶段结束后,即将进行
的一定是Reduce阶段
的工作,同样,Reduce阶段也是先进行准备
工作,之后
便执行核心业务代码跳转到重写的reduce方法
中继续执行:
进入重写的reduce方法后,同样需要明确该方法执行了多少次?我们知道reduce方法
根据传入的key值
(也即是 map阶段输出的key值
,也就是手机号码
)确定执行次数,如果一个手机号码只出现了1次,那么该方法只执行1次,如果出现2次,方法就会同样执行2次。下面两组reduce操作展现了不同手机号码方法执行次数的差异,第一次执行时reduce操作拿到了以134开头,144结尾的手机号码,该号码只出现了一次,所以reduce方法(内层for循环)只执行了1次;
第二次执行时reduce操作拿到了以135开头,656结尾的手机号码,该号码出现了2次,所以reduce方法(内层for循环)也执行了2次;
当所有key值
都执行过reduce方法
后,便跳转到了Reduce阶段工作结束
的cleanup
代码:
以上便是Map-Reduce的底层逻辑实现,对于后续的学习有所帮助。
5)上传到集群测试
此处详细流程不再赘述,具体如何上传并测试的操作可看中有关集群测试的内容。
文件输入路径:
文件输出路径:
输出文件内容:
可以看到和我们本地运行时的输出结果一致,第一列代表每一个手机号码,第二列代表上行流量,第三列代表下行流量,第四列是总流量统计。
因篇幅问题不能全部显示,请点此查看更多更全内容