01.Mapreduce实例——去重(01. MapReduce instance – de duplication)

01.Mapreduce实例——去重

实验目的

1.准确理解MapReduce去重的设计原理

2.熟练掌握MapReduce去重的程序编写

3.学会自己编写MapReduce去重代码解决实际问题

实验原理

“数据去重”主要是为了掌握和利用并行化思想来对数据进行有意义的筛选。统计大数据集上的数据种类个数、从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。

数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后交给reduce。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求(可以设置为空)。当reduce接收到一个<key,value-list>时就直接将输入的key复制到输出的key中,并将value设置成空值,然后输出<key,value>。

MaprReduce去重流程如下图所示:

实验环境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

实验内容

现有一个某电商网站的数据文件,名为buyer_favorite1,记录了用户收藏的商品以及收藏的日期,文件buyer_favorite1中包含(用户id,商品id,收藏日期)三个字段,数据内容以“\t”分割,由于数据很大,所以为了方便统计我们只截取它的一部分数据,内容如下:

  • 用户id   商品id    收藏日期  
  • 10181   1000481   2010-04-04 16:54:31  
  • 20001   1001597   2010-04-07 15:07:52  
  • 20001   1001560   2010-04-07 15:08:27  
  • 20042   1001368   2010-04-08 08:20:30  
  • 20067   1002061   2010-04-08 16:45:33  
  • 20056   1003289   2010-04-12 10:50:55  
  • 20056   1003290   2010-04-12 11:57:35  
  • 20056   1003292   2010-04-12 12:05:29  
  • 20054   1002420   2010-04-14 15:24:12  
  • 20055   1001679   2010-04-14 19:46:04  
  • 20054   1010675   2010-04-14 15:23:53  
  • 20054   1002429   2010-04-14 17:52:45  
  • 20076   1002427   2010-04-14 19:35:39  
  • 20054   1003326   2010-04-20 12:54:44  
  • 20056   1002420   2010-04-15 11:24:49  
  • 20064   1002422   2010-04-15 11:35:54  
  • 20056   1003066   2010-04-15 11:43:01  
  • 20056   1003055   2010-04-15 11:43:06  
  • 20056   1010183   2010-04-15 11:45:24  
  • 20056   1002422   2010-04-15 11:45:49  
  • 20056   1003100   2010-04-15 11:45:54  
  • 20056   1003094   2010-04-15 11:45:57  
  • 20056   1003064   2010-04-15 11:46:04  
  • 20056   1010178   2010-04-15 16:15:20  
  • 20076   1003101   2010-04-15 16:37:27  
  • 20076   1003103   2010-04-15 16:37:05  
  • 20076   1003100   2010-04-15 16:37:18  
  • 20076   1003066   2010-04-15 16:37:31  
  • 20054   1003103   2010-04-15 16:40:14  
  • 20054   1003100   2010-04-15 16:40:16  

要求用Java编写MapReduce程序,根据商品id进行去重,统计用户收藏商品中都有哪些商品被收藏。结果数据如下:

  • 商品id  
  • 1000481  
  • 1001368  
  • 1001560  
  • 1001597  
  • 1001679  
  • 1002061  
  • 1002420  
  • 1002422  
  • 1002427  
  • 1002429  
  • 1003055  
  • 1003064  
  • 1003066  
  • 1003094  
  • 1003100  
  • 1003101  
  • 1003103  
  • 1003289  
  • 1003290  
  • 1003292  
  • 1003326  
  • 1010178  
  • 1010183  
  • 1010675  

实验步骤

1.切换到/apps/hadoop/sbin目录下,开启Hadoop。

  • cd /apps/hadoop/sbin  
  • ./start-all.sh  

2.在Linux本地新建/data/mapreduce2目录。

  • mkdir -p /data/mapreduce2  

3. (自行生成文本文件,放到个人指定文件夹下)在Linux中切换到/data/mapreduce2目录下,用wget命令从http://192.168.1.100:60000/allfiles/mapreduce2/buyer_favorite1网址上下载文本文件buyer_favorite1。

  • cd /data/mapreduce2  
  • wget http://192.168.1.100:60000/allfiles/mapreduce2/buyer_favorite1  

然后在当前目录下用wget命令从http://192.168.1.100:60000/allfiles/mapreduce2/hadoop2lib.tar.gz网址上下载项目用到的依赖包。

  • wget http://192.168.1.100:60000/allfiles/mapreduce2/hadoop2lib.tar.gz  

将hadoop2lib.tar.gz解压到当前目录下。

  • tar zxvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce2/in目录,然后将Linux本地/data/mapreduce2目录下的buyer_favorite1文件导入到HDFS的/mymapreduce2/in目录中。

  • hadoop fs -mkdir -p /mymapreduce2/in  
  • hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in  

5.新建Java Project项目,项目名为mapreduce2。

在mapreduce2项目下新建包,包名为mapreduce。

在mapreduce包下新建类,类名为Filter。

6.添加项目所需依赖的jar包

右键项目,新建一个文件夹,命名为:hadoop2lib,用于存放项目所需的jar包。

将/data/mapreduce2目录下,hadoop2lib目录中的jar包,拷贝到eclipse中mapreduce2项目的hadoop2lib目录下。

选中所有项目hadoop2lib目录下所有jar包,并添加到Build Path中。

7.编写程序代码,并描述其思路

数据去重的目的是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然想到将相同key值的所有value记录交到一台reduce机器,让其无论这个数据出现多少次,最终结果只输出一次。具体就是reduce的输出应该以数据作为key,而对value-list没有要求,当reduce接收到一个时,就直接将key复制到输出的key中,将value设置为空。

Map代码

  • public static class Map extends Mapper  
  •     //map将输入中的value复制到输出数据的key上,并直接输出  
  •     {  
  •     private static Text newKey=new Text();      //从输入中得到的每行的数据的类型  
  •     public void map(Object key,Text value,Context context) throws IOException, InterruptedException  
  •     //实现map函数  
  •     {             //获取并输出每一次的处理过程  
  •     String line=value.toString();  
  •     System.out.println(line);  
  •     String arr[]=line.split(“\t”);  
  •     newKey.set(arr[1]);  
  •     context.write(newKey, NullWritable.get());  
  •     System.out.println(newKey);  
  •     }  
  •     }  

map阶段采用Hadoop的默认的作业输入方式,把输入的value用split()方法截取,截取出的商品id字段设置为key,设置value为空,然后直接输出<key,value>。

reduce端代码

  • public static class Reduce extends Reducer{  
  •         public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException  
  •     //实现reduce函数  
  •     {  
  •     context.write(key,NullWritable.get());   //获取并输出每一次的处理过程  
  •     }  
  •     }  

map输出的<key,value>键值对经过shuffle过程,聚成<key,value-list>后,会交给reduce函数。reduce函数,不管每个key 有多少个value,它直接将输入的赋值给输出的key,将输出的value设置为空,然后输出<key,value>就可以了。

完整代码

  • package mapreduce;  
  • import java.io.IOException;  
  • import org.apache.hadoop.conf.Configuration;  
  • import org.apache.hadoop.fs.Path;  
  • import org.apache.hadoop.io.IntWritable;  
  • import org.apache.hadoop.io.NullWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Job;  
  • import org.apache.hadoop.mapreduce.Mapper;  
  • import org.apache.hadoop.mapreduce.Reducer;  
  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  • import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  • public class Filter{  
  •     public static class Map extends Mapper{  
  •     private static Text newKey=new Text();  
  •     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  •     String line=value.toString();  
  •     System.out.println(line);  
  •     String arr[]=line.split(“\t”);  
  •     newKey.set(arr[1]);  
  •     context.write(newKey, NullWritable.get());  
  •     System.out.println(newKey);  
  •     }  
  •     }  
  •     public static class Reduce extends Reducer{  
  •     public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{  
  •         context.write(key,NullWritable.get());  
  •         }  
  •         }  
  •         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  •         Configuration conf=new Configuration();  
  •         System.out.println(“start”);  
  •         Job job =new Job(conf,”filter”);  
  •         job.setJarByClass(Filter.class);  
  •         job.setMapperClass(Map.class);  
  •         job.setReducerClass(Reduce.class);  
  •         job.setOutputKeyClass(Text.class);  
  •         job.setOutputValueClass(NullWritable.class);  
  •         job.setInputFormatClass(TextInputFormat.class);  
  •         job.setOutputFormatClass(TextOutputFormat.class);  
  •         Path in=new Path(“hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1”);  
  •         Path out=new Path(“hdfs://localhost:9000/mymapreduce2/out”);  
  •         FileInputFormat.addInputPath(job,in);  
  •         FileOutputFormat.setOutputPath(job,out);  
  •         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  •         }  
  •         }  

8.在Filter类文件中,右键并点击=>Run As=>Run on Hadoop选项,将MapReduce任务提交到Hadoop中。

9.待执行完毕后,进入命令模式下,在HDFS中/mymapreduce2/out查看实验结果。

  • hadoop fs -ls /mymapreduce2/out  
  • hadoop fs -cat /mymapreduce2/out/part-r-00000  
————————

01.Mapreduce实例——去重

< strong > experimental purpose < / strong >

1. Accurately understand the design principle of MapReduce de duplication

2. Master the programming of MapReduce de duplication

3. Learn to write MapReduce de duplication code to solve practical problems

< strong > experimental principle < / strong >

“Data De duplication” is mainly to grasp and use the idea of parallelization to screen data meaningfully. The seemingly complicated tasks such as counting the number of data types on large data sets and calculating access places from website logs will involve data De duplication.

The ultimate goal of data De duplication is to make the data that appears more than once in the original data appear only once in the output file. In the MapReduce process, map output & lt; key,value> Integrate & lt; through shuffle process; key,value-list> Then give it to reduce. We naturally think of giving all records of the same data to a reduce machine. No matter how many times the data appears, it can be output once in the final result. Specifically, the input of reduce should take data as the key, while there is no requirement for value list (it can be set to be empty). When reduce receives an & lt; key,value-list> Directly copy the input key to the output key, set value to null, and then output & lt; key,value>。

The maprreduce de duplication process is shown in the following figure:

< strong > experimental environment < / strong >

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

< strong > experiment content < / strong >

There is a data file of an e-commerce website named buyer_ Favorite1, which records the goods collected by the user and the date of collection, and the file buyer_ Favorite1 contains three fields (user ID, commodity ID and collection date). The data content is divided by “\ T”. Due to the large data, we only intercept part of its data for statistics, as follows:

  • User ID     Commodity ID      Collection date
  • ten thousand one hundred and eighty-one     one million four hundred and eighty-one     2010-04-04   16:54:31
  • twenty thousand and one     one million one thousand five hundred and ninety-seven     2010-04-07   15:07:52
  • twenty thousand and one     one million one thousand five hundred and sixty     2010-04-07   15:08:27
  • twenty thousand and forty-two     one million one thousand three hundred and sixty-eight     2010-04-08   08:20:30
  • twenty thousand and sixty-seven     one million two thousand and sixty-one     2010-04-08   16:45:33
  • twenty thousand and fifty-six     one million three thousand two hundred and eighty-nine     2010-04-12   10:50:55
  • twenty thousand and fifty-six     one million three thousand two hundred and ninety     2010-04-12   11:57:35
  • twenty thousand and fifty-six     one million three thousand two hundred and ninety-two     2010-04-12   12:05:29
  • twenty thousand and fifty-four     one million two thousand four hundred and twenty     2010-04-14   15:24:12
  • twenty thousand and fifty-five     one million one thousand six hundred and seventy-nine     2010-04-14   19:46:04
  • twenty thousand and fifty-four     one million ten thousand six hundred and seventy-five     2010-04-14   15:23:53
  • twenty thousand and fifty-four     one million two thousand four hundred and twenty-nine     2010-04-14   17:52:45
  • twenty thousand and seventy-six     one million two thousand four hundred and twenty-seven     2010-04-14   19:35:39
  • twenty thousand and fifty-four     one million three thousand three hundred and twenty-six     2010-04-20   12:54:44
  • twenty thousand and fifty-six     one million two thousand four hundred and twenty     2010-04-15   11:24:49
  • twenty thousand and sixty-four     one million two thousand four hundred and twenty-two     2010-04-15   11:35:54
  • twenty thousand and fifty-six     one million three thousand and sixty-six     2010-04-15   11:43:01
  • twenty thousand and fifty-six     one million three thousand and fifty-five     2010-04-15   11:43:06
  • twenty thousand and fifty-six     one million ten thousand one hundred and eighty-three     2010-04-15   11:45:24
  • twenty thousand and fifty-six     one million two thousand four hundred and twenty-two     2010-04-15   11:45:49
  • twenty thousand and fifty-six     one million three thousand and one hundred     2010-04-15   11:45:54
  • twenty thousand and fifty-six     one million three thousand and ninety-four     2010-04-15   11:45:57
  • twenty thousand and fifty-six     one million three thousand and sixty-four     2010-04-15   11:46:04
  • twenty thousand and fifty-six     one million ten thousand one hundred and seventy-eight     2010-04-15   16:15:20
  • twenty thousand and seventy-six     one million three thousand one hundred and one     2010-04-15   16:37:27
  • twenty thousand and seventy-six     one million three thousand one hundred and three     2010-04-15   16:37:05
  • twenty thousand and seventy-six     one million three thousand and one hundred     2010-04-15   16:37:18
  • twenty thousand and seventy-six     one million three thousand and sixty-six     2010-04-15   16:37:31
  • twenty thousand and fifty-four     one million three thousand one hundred and three     2010-04-15   16:40:14
  • twenty thousand and fifty-four     one million three thousand and one hundred     2010-04-15   16:40:16

It is required to write MapReduce program in Java, remove the duplicate according to the commodity ID, and count which commodities in the user’s collection are collected. The result data are as follows:

  • Commodity ID
  • one million four hundred and eighty-one
  • one million one thousand three hundred and sixty-eight
  • one million one thousand five hundred and sixty
  • one million one thousand five hundred and ninety-seven
  • one million one thousand six hundred and seventy-nine
  • one million two thousand and sixty-one
  • one million two thousand four hundred and twenty
  • one million two thousand four hundred and twenty-two
  • one million two thousand four hundred and twenty-seven
  • one million two thousand four hundred and twenty-nine
  • one million three thousand and fifty-five
  • one million three thousand and sixty-four
  • one million three thousand and sixty-six
  • one million three thousand and ninety-four
  • one million three thousand and one hundred
  • one million three thousand one hundred and one
  • one million three thousand one hundred and three
  • one million three thousand two hundred and eighty-nine
  • one million three thousand two hundred and ninety
  • one million three thousand two hundred and ninety-two
  • one million three thousand three hundred and twenty-six
  • one million ten thousand one hundred and seventy-eight
  • one million ten thousand one hundred and eighty-three
  • one million ten thousand six hundred and seventy-five

< strong > experimental steps < / strong >

1. Switch to / apps / Hadoop / SBIN directory and start Hadoop.

  • cd /apps/hadoop/sbin  
  • ./start-all.sh  

2. Create a new / data / mapreduce2 directory on Linux.

  • mkdir -p /data/mapreduce2  

3. < strong > (generate a text file by yourself and put it in the personal designated folder) < / strong > switch to the / data / mapreduce2 directory in Linux, and use the WGet command to download it from the http://192.168.1.100:60000/allfiles/mapreduce2/buyer_ Download the text file buyer from the favorite1 website_ favorite1。

  • cd /data/mapreduce2  
  • wget http://192.168.1.100:60000/allfiles/mapreduce2/buyer_favorite1  

Then use the WGet command from the current directory http://192.168.1.100:60000/allfiles/mapreduce2/hadoop2lib.tar.gz Download the dependency package used by the project from the website.

  • wget http://192.168.1.100:60000/allfiles/mapreduce2/hadoop2lib.tar.gz  

Unzip Hadoop 2lib.tar.gz to the current directory.

  • tar zxvf hadoop2lib.tar.gz  

4. First create a new / mymapreduce2 / in directory on HDFS, and then set the Buyer under the Linux local / data / mapreduce2 directory_ Import the favorite1 file into the / mymapreduce2 / in directory of HDFS.

  • hadoop fs -mkdir -p /mymapreduce2/in  
  • hadoop fs -put /data/mapreduce2/buyer_favorite1 /mymapreduce2/in  

5. Create a new Java project named mapreduce2.

Create a new package named MapReduce under the MapReduce 2 project.

Create a new class named filter under the MapReduce package.

6. Add the jar package that the project depends on

Right click the project and create a new folder named Hadoop 2lib, which is used to store the jar packages required by the project.

Copy the jar package in / data / mapreduce2 directory and Hadoop 2lib directory to Hadoop 2lib directory of mapreduce2 project in eclipse.

Select all jar packages under Hadoop 2lib directory of all projects and add them to build path.

7. Write program code and describe its ideas

The purpose of data De duplication is to make the data that appears more than once in the original data appear only once in the output file. We naturally think of handing over all value records with the same key value to a reduce machine, so that no matter how many times the data appears, the final result will be output only once. Specifically, the output of reduce should take data as the key, but there is no requirement for value list. When reduce receives one, it will directly copy the key to the output key and set value to null.

Map code

  • public static class Map extends Mapper  
  • // Map copies the value in the input to the key of the output data and outputs it directly
  • {
  •     private static Text newKey=new Text();      //从输入中得到的每行的数据的类型  
  •     public void map(Object key,Text value,Context context) throws IOException, InterruptedException  
  • // Implement map function
  • {              // Get and output each processing procedure
  •     String line=value.toString();  
  •     System.out.println(line);  
  •     String arr[]=line.split(“\t”);  
  •     newKey.set(arr[1]);  
  •     context.write(newKey, NullWritable.get());  
  •     System.out.println(newKey);  
  • }
  • }

The map phase adopts the default job input method of Hadoop, intercepts the input value with the split () method, sets the intercepted commodity ID field to key, sets the value to null, and then directly outputs & lt; key,value>。

Reduce side code

  • public static class Reduce extends Reducer{  
  •         public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException  
  • // Implement the reduce function
  • {
  •     context.write(key,NullWritable.get());   //获取并输出每一次的处理过程  
  • }
  • }

Map output & lt; key,value> Key value pairs are aggregated into & lt; key,value-list> After, it will be handed over to the reduce function. The reduce function, no matter how many values each key has, directly assigns the input value to the output key, sets the output value to null, and then outputs & lt; key,value> That’s it.

Complete code

  • package mapreduce;  
  • import java.io.IOException;  
  • import org.apache.hadoop.conf.Configuration;  
  • import org.apache.hadoop.fs.Path;  
  • import org.apache.hadoop.io.IntWritable;  
  • import org.apache.hadoop.io.NullWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Job;  
  • import org.apache.hadoop.mapreduce.Mapper;  
  • import org.apache.hadoop.mapreduce.Reducer;  
  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  • import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  • public class Filter{  
  •     public static class Map extends Mapper{  
  •     private static Text newKey=new Text();  
  •     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  •     String line=value.toString();  
  •     System.out.println(line);  
  •     String arr[]=line.split(“\t”);  
  •     newKey.set(arr[1]);  
  •     context.write(newKey, NullWritable.get());  
  •     System.out.println(newKey);  
  • }
  • }
  •     public static class Reduce extends Reducer{  
  •     public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{  
  •         context.write(key,NullWritable.get());  
  • }
  • }
  •         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  •         Configuration conf=new Configuration();  
  •         System.out.println(“start”);  
  •         Job job =new Job(conf,”filter”);  
  •         job.setJarByClass(Filter.class);  
  •         job.setMapperClass(Map.class);  
  •         job.setReducerClass(Reduce.class);  
  •         job.setOutputKeyClass(Text.class);  
  •         job.setOutputValueClass(NullWritable.class);  
  •         job.setInputFormatClass(TextInputFormat.class);  
  •         job.setOutputFormatClass(TextOutputFormat.class);  
  •         Path in=new Path(“hdfs://localhost:9000/mymapreduce2/in/buyer_favorite1”);  
  •         Path out=new Path(“hdfs://localhost:9000/mymapreduce2/out”);  
  •         FileInputFormat.addInputPath(job,in);  
  •         FileOutputFormat.setOutputPath(job,out);  
  •         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  • }
  • }

8. In the filter class file, right-click and click = & gt; Run As=> Run on Hadoop option to submit MapReduce task to Hadoop.

9. After execution, enter the command mode and view the experimental results in / mymapreduce2 / out in HDFS.

  • hadoop fs -ls /mymapreduce2/out  
  • hadoop fs -cat /mymapreduce2/out/part-r-00000