预览加载中,请您耐心等待几秒...
1/5
2/5
3/5
4/5
5/5

在线预览结束,喜欢就下载吧,查找使用更方便

如果您无法下载资料,请参考说明:

1、部分资料下载需要金币,请确保您的账户上有足够的金币

2、已购买过的文档,再次下载不重复扣费

3、资料包下载后请先用软件解压,在使用对应软件打开

MapReduce学习日志之我的MapReduce程序学习 将一批电话通信清单,记录了用户A拨打用户B的记录,需要做一个倒排索引,记录拨打给用户B的所有用户A。如 原有的txt为: 首先,我们应该把源文件传到Hdfs上,然后将原始数据进行分割,将被叫作为KEY,主叫作为Value,将拨打相同被叫的主叫号码汇总起来输出到HDFS。程序如下: packagecom.xxs; importjava.io.IOException; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.conf.Configured; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.*; importorg.apache.hadoop.mapreduce.*; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat; importorg.apache.hadoop.util.Tool; importorg.apache.hadoop.util.ToolRunner;//此处为导入的包,一般为固定的。 publicclassMapTest_2extendsConfiguredimplementsTool{ enumCounter { LINESKIP, }//出错的行,出错计数器 publicstaticclassMapextendsMapper<LongWritable,Text,Text,Text> { publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException { Stringline=value.toString(); try { String[]lineSplit=line.split(""); Stringanum=lineSplit[0]; Stringbnum=lineSplit[1]; context.write(newText(bnum),newText(anum)); } catch(java.lang.ArrayIndexOutOfBoundsExceptione) { context.getCounter(Counter.LINESKIP).increment(1); return; } } } publicstaticclassReduceextendsReducer<Text,Text,Text,Text> { publicvoidreduce(Textkey,Iterable<Text>values,Contextcontext)throwsIOException,InterruptedException { StringvalueString; Stringout=""; for(Textvalue:values) { valueString=value.toString(); out+=valueString+"|"; } context.write(key,newText(out)); } } publicintrun(String[]args)throwsException { Configurationconf=getConf(); Jobjob=newJob(conf,"MapTest_2");//任务名 job.setJarByClass(MapTest_2.class);//指定class FileInputFormat.addInputPath(job,newPath(args[0]));//输入路径 FileOutputFormat.setOutputPath(job,newPath(args[1]));//输出路径 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class);