1)首先是定义DoubleArrayWritable类,这个类继承与ArrayWritable。
1 import org.apache.hadoop.io.IntWritable;
2 import org.apache.hadoop.io.ArrayWritable;
3 public class IntArrayWritable extends ArrayWritable {
4 public IntArrayWritable(){
5 super(IntWritable.class);
6 }
7 }
因为要读取SequenceFile中的(key,value)传给map,所以需要以4-6的形式显示定义构造函数。
2)然后是将DoubleArrayWritable类型的对象作为value写入SequenceFile,使用SequenceFile.writer
/**
* Created with IntelliJ IDEA.
* User: hadoop
* Date: 16-3-4
* Time: 上午10:36
* To change this template use File | Settings | File Templates.
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.ArrayWritable;
public class SequenceFileWriterDemo {
public static void main(String[] args) throws IOException {
String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(URI.create(uri),conf);
Path path=new Path(uri);
IntWritable key=new IntWritable();
IntArrayWritable value=new IntArrayWritable();//定义IntArrayWritable类型的alue值。
value.set(new IntWritable[]{new IntWritable(1),new IntWritable(2),new IntWritable(3),
new IntWritable(4)});
SequenceFile.Writer writer=null;
writer=SequenceFile.createWriter(fs,conf,path,key.getClass(),value.getClass());
int i=0;
while(i<10){
key.set(i++);
//value.set(intArray);
writer.append(key,value);
}
writer.close();//一定要加上这句,否则写入SequenceFile会失败,结果是一个空文件。
System.out.println("done!");
}
}
class IntArrayWritable extends ArrayWritable {
public IntArrayWritable(){
super(IntWritable.class);
}
}
这就完成了一个10行4列的矩阵写入SequenceFile文件在,其中key是矩阵行号,value是IntArrayWritable类型的变量。
3)将生成的SequenceFile上传到集群,然后查看其内容,使用命令(需要将IntArrayWritable类打包并将其路径加入到hadoop_env.sh中HADOOP_CLASSPATH中)如下:
hadoop fs -text /testData/10IntArray
结果如下:
好像哪里不对?应该是[1,2,3,4]数组呀。其实是对的,写入SequenceFile中时就是将”活对象“持久化存储的过程,也就是序列化,所以当我们以文本的方式(-text)打开文件时,就看到了IntArrayWritable…的形式。如果想要看数组也可以,反序列化就好了。
4)使用SequenceFile.reader读取上述SequenceFile文件的内容,我要看到数组~~~,代码如下:
/**
* Created with IntelliJ IDEA.
* User: hadoop
* Date: 16-3-4
* Time: 下午5:41
* To change this template use File | Settings | File Templates.
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
//import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.net.URI;
public class SequencefileReaderDemo {
public static void main(String[] args) throws IOException {
String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
Configuration conf=new Configuration();
FileSystem fs =FileSystem.get(URI.create(uri),conf);
Path path=new Path(uri);
SequenceFile.Reader reader=null;
try {
reader=new SequenceFile.Reader(fs,path,conf);
Writable key =(Writable)ReflectionUtils.newInstance(reader.getKeyClass(),conf);
IntArrayWritable value=(IntArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf);
long position=reader.getPosition();
String[] sValue=null;
while(reader.next(key,value)){
String syncSeen=reader.syncSeen()?"*":"";
sValue=value.toStrings();
System.out.printf("[%s%s]\t%s\t%s\t",position,syncSeen,key,value);
for (String s:sValue){
System.out.printf("%s\t", s);
}
System.out.println();
position=reader.getPosition();
}
}
finally {
IOUtils.closeStream(reader);
}
}
}
复制代码
运行结果如下:
5)最后,利用上述生成的SequenceFile文件作为左矩阵,写一个MR程序计算矩阵的乘法,代码如下:
1 /**
2 * Created with IntelliJ IDEA.
3 * User: hadoop
4 * Date: 16-3-4
5 * Time: 上午10:34
6 * To change this template use File | Settings | File Templates.
7 */
8 import org.apache.hadoop.conf.Configuration;
9 import org.apache.hadoop.fs.FileSystem;
10 import org.apache.hadoop.fs.Path;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.io.IntWritable;
15 import org.apache.hadoop.mapreduce.lib.input.*;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
18
19 import java.io.IOException;
20 import java.lang.reflect.Array;
21 import java.net.URI;
22
23
24 public class MRTest {
25 public static class MyMapper extends Mapper<IntWritable,IntArrayWritable,IntWritable,IntArrayWritable>{
26 public static int[][] rightMatrix=new int[][]{{10,10,10,10,10},{10,10,10,10,10},{10,10,10,10,10},{10,10,10,10,10}};
27 public IntWritable key=new IntWritable();
28 public IntArrayWritable value=new IntArrayWritable();
29 //public IntWritable[] valueInput=null;
30 public Object valueObject=null;
31 public IntWritable[] arraySum=new IntWritable[rightMatrix[0].length];
32 public int sum=0;
33 public void map(IntWritable key,IntArrayWritable value,Context context) throws IOException, InterruptedException {
34 valueObject=value.toArray();//value.toArray的返回值是一个Object类型的对象,但是Object内部值是数组呀
35 //使用Array.get(valueObject,3)可以得到数组中第4个元素,然后将其转化为string,再使用
36 //Integer.parseInt(str)将其转化为整型值.
37 for (int i=0;i<rightMatrix[0].length;++i){
38 sum=0;
39 for (int j=0;j<rightMatrix.length;++j){
40 sum+=(Integer.parseInt(((Array.get(valueObject,j)).toString())))*rightMatrix[j][i];
41 }
42 arraySum[i]=new IntWritable(sum);
43 }
44 value.set(arraySum);
45 context.write(key,value);
46 }
47 }
48 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
49 String uri="/home/hadoop/2016Test/SeqTest/10IntArray";
50 String outUri="/home/hadoop/2016Test/SeqTest/output";
51 Configuration conf=new Configuration();
52 FileSystem fs=FileSystem.get(URI.create(uri), conf);
53
54 fs.delete(new Path(outUri),true);//输出目录存在的话就将其删除。
55
56 Job job=new Job(conf,"SeqMatrix");
57 job.setJarByClass(MRTest.class);
58 job.setMapperClass(MyMapper.class);
59 job.setInputFormatClass(SequenceFileInputFormat.class);
60 job.setOutputFormatClass(SequenceFileOutputFormat.class);
61 job.setOutputKeyClass(IntWritable.class);
62 job.setOutputValueClass(IntArrayWritable.class);
63 FileInputFormat.setInputPaths(job,new Path(uri));
64 FileOutputFormat.setOutputPath(job,new Path(outUri));
65 System.exit(job.waitForCompletion(true)?0:1);
66 }
67
68
69 }
结果为
因篇幅问题不能全部显示,请点此查看更多更全内容