Mapreduce实例——求平均值(MapReduce instance – Average)

02Mapreduce实例——求平均值

实验目的

1.准确理解Mapreduce求平均值的设计原理

2.熟练掌握Mapreduce求平均值程序的编写

3.学会编写Mapreduce求平均值程序代码解决问题

实验原理

求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,在数据输入到Reduce之前先经过shuffle,将map函数输出的key值相同的所有的value值形成一个集合value-list,然后将输入到Reduce端,Reduce端汇总并且统计记录数,然后作商即可。具体原理如下图所示:

实验环境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

实验内容

现有某电商关于商品点击情况的数据文件,表名为goods_click,包含两个字段(商品分类,商品点击次数),分隔符“\t”,由于数据很大,所以为了方便统计我们只截取它的一部分数据,内容如下:

  • 商品分类 商品点击次数  
  • 52127   5  
  • 52120   93  
  • 52092   93  
  • 52132   38  
  • 52006   462  
  • 52109   28  
  • 52109   43  
  • 52132   0  

10. 52132   34  

11. 52132   9  

12. 52132   30  

13. 52132   45  

14. 52132   24  

15. 52009   2615  

16. 52132   25  

17. 52090   13  

18. 52132   6  

19. 52136   0  

20. 52090   10  

21. 52024   347  

要求使用mapreduce统计出每类商品的平均点击次数。

结果数据如下:

  • 商品分类 商品平均点击次数  
  • 52006   462  
  • 52009   2615  
  • 52024   347  
  • 52090   11  
  • 52092   93  
  • 52109   35  
  • 52120   93  
  • 52127   5  

10. 52132   23  

11. 52136   0  

实验步骤

1.切换到/apps/hadoop/sbin目录下,开启Hadoop。

  • cd /apps/hadoop/sbin  
  • ./start-all.sh  

2.在Linux本地新建/data/mapreduce4目。

  • mkdir -p /data/mapreduce4  

3.在Linux中切换到/data/mapreduce4目录下,用wget命令从http://192.168.1.100:60000/allfiles/mapreduce4/goods_click网址上下载文本文件goods_click。

  • cd /data/mapreduce4  
  • wget http://192.168.1.100:60000/allfiles/mapreduce4/goods_click  

然后在当前目录下用wget命令从http://192.168.1.100:60000/allfiles/mapreduce4/hadoop2lib.tar.gz网址上下载项目用到的依赖包。

  • wget http://192.168.1.100:60000/allfiles/mapreduce4/hadoop2lib.tar.gz  

将hadoop2lib.tar.gz解压到当前目录下。

  • tar zxvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce4/in目录,然后将Linux本地/data/mapreduce4目录下的goods_click文件导入到HDFS的/mymapreduce4/in目录中。

  • hadoop fs -mkdir -p /mymapreduce4/in  
  • hadoop fs -put /data/mapreduce4/goods_click /mymapreduce4/in  

5.新建Java Project项目,项目名为mapreduce4。

在mapreduce4项目下新建包,包名为mapreduce。

在mapreduce包下新建类,类名为MyAverage。

6.添加项目所需依赖的jar包,右键点击mapreduce4,新建一个文件夹,名为hadoop2lib,用于存放项目所需的jar包。

将/data/mapreduce4目录下,hadoop2lib目录中的jar包,拷贝到eclipse中mapreduce4项目的hadoop2lib目录下。

选中hadoop2lib目录下所有jar包,并添加到Build Path中。

7.编写Java代码并描述其设计思路。

Mapper代码

  • public static class Map extends Mapper{  
  •     private static Text newKey=new Text();  
  •     //实现map函数  
  •     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  •     // 将输入的纯文本文件的数据转化成String  
  •     String line=value.toString();  
  •     System.out.println(line);  
  •     String arr[]=line.split(“\t”);  
  •     newKey.set(arr[0]);  
  • 10.     int click=Integer.parseInt(arr[1]);  
  • 11.     context.write(newKey, new IntWritable(click));  
  • 12.     }  
  • 13.     }  

map端在采用Hadoop的默认输入方式之后,将输入的value值通过split()方法截取出来,我们把截取的商品点击次数字段转化为IntWritable类型并将其设置为value,把商品分类字段设置为key,然后直接输出key/value的值。

Reducer代码

  • public static class Reduce extends Reducer{  
  • //实现reduce函数  
  • public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{  
  •     int num=0;  
  •     int count=0;  
  •     for(IntWritable val:values){  
  •     num+=val.get(); //每个元素求和num  
  •     count++;        //统计元素的次数count  
  •     }  
  • 10.     int avg=num/count;  //计算平均数  
  • 11.   
  • 12.     context.write(key,new IntWritable(avg));  
  • 13.     }  
  • 14.     }  

map的输出<key,value>经过shuffle过程集成<key,values>键值对,然后将<key,values>键值对交给reduce。reduce端接收到values之后,将输入的key直接复制给输出的key,将values通过for循环把里面的每个元素求和num并统计元素的次数count,然后用num除以count 得到平均值avg,将avg设置为value,最后直接输出<key,value>就可以了。

完整代码

  • package mapreduce;  
  • import java.io.IOException;  
  • import org.apache.hadoop.conf.Configuration;  
  • import org.apache.hadoop.fs.Path;  
  • import org.apache.hadoop.io.IntWritable;  
  • import org.apache.hadoop.io.NullWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Job;  
  • import org.apache.hadoop.mapreduce.Mapper;  

10. import org.apache.hadoop.mapreduce.Reducer;  

11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  

13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

15. public class MyAverage{  

  • 16.     public static class Map extends Mapper{  
  • 17.     private static Text newKey=new Text();  
  • 18.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  • 19.     String line=value.toString();  
  • 20.     System.out.println(line);  
  • 21.     String arr[]=line.split(“\t”);  
  • 22.     newKey.set(arr[0]);  
  • 23.     int click=Integer.parseInt(arr[1]);  
  • 24.     context.write(newKey, new IntWritable(click));  
  • 25.     }  
  • 26.     }  
  • 27.     public static class Reduce extends Reducer{  
  • 28.     public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{  
  • 29.         int num=0;  
  • 30.         int count=0;  
  • 31.         for(IntWritable val:values){  
  • 32.         num+=val.get();  
  • 33.         count++;  
  • 34.         }  
  • 35.         int avg=num/count;  
  • 36.         context.write(key,new IntWritable(avg));  
  • 37.         }  
  • 38.         }  
  • 39.         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  • 40.         Configuration conf=new Configuration();  
  • 41.         System.out.println(“start”);  
  • 42.         Job job =new Job(conf,”MyAverage”);  
  • 43.         job.setJarByClass(MyAverage.class);  
  • 44.         job.setMapperClass(Map.class);  
  • 45.         job.setReducerClass(Reduce.class);  
  • 46.         job.setOutputKeyClass(Text.class);  
  • 47.         job.setOutputValueClass(IntWritable.class);  
  • 48.         job.setInputFormatClass(TextInputFormat.class);  
  • 49.         job.setOutputFormatClass(TextOutputFormat.class);  
  • 50.         Path in=new Path(“hdfs://localhost:9000/mymapreduce4/in/goods_click”);  
  • 51.         Path out=new Path(“hdfs://localhost:9000/mymapreduce4/out”);  
  • 52.         FileInputFormat.addInputPath(job,in);  
  • 53.         FileOutputFormat.setOutputPath(job,out);  
  • 54.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  • 55.   
  • 56.         }  
  • 57.         }  

8.在MyAverage类文件中,右键并点击=>Run As=>Run on Hadoop选项,将MapReduce任务提交到Hadoop中。

9.待执行完毕后,进入命令模式下,在HDFS上/mymapreduce4/out中查看实验结果。

  • hadoop fs -ls /mymapreduce4/out  
  • hadoop fs -cat /mymapreduce4/out/part-r-00000  
————————

02Mapreduce实例——求平均值

< strong > experimental purpose < / strong >

1. Accurately understand the design principle of MapReduce averaging

2. Master the preparation of MapReduce average program

3. Learn to write MapReduce average program code to solve problems

< strong > experimental principle < / strong >

Averaging is a common algorithm in MapReduce, and the algorithm for averaging is also relatively simple. One idea is to read the data at the map end, go through shuffle before inputting the data to reduce, form a set value list of all value values with the same key value output by the map function, and then input it to the reduce end. The reduce end summarizes and counts the number of records, and then makes a business. The specific principle is shown in the figure below:

< strong > experimental environment < / strong >

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

< strong > experiment content < / strong >

There is a data file about commodity clicks of an e-commerce, and the table name is goods_ Click contains two fields (commodity classification and commodity clicks) and the separator “\ T”. Due to the large data, we only intercept part of its data for the convenience of statistics. The contents are as follows:

  • Commodity classification   Product hits
  • fifty-two thousand one hundred and twenty-seven     five
  • fifty-two thousand one hundred and twenty     ninety-three
  • fifty-two thousand and ninety-two     ninety-three
  • fifty-two thousand one hundred and thirty-two     thirty-eight
  • fifty-two thousand and six     four hundred and sixty-two
  • fifty-two thousand one hundred and nine     twenty-eight
  • fifty-two thousand one hundred and nine     forty-three
  • fifty-two thousand one hundred and thirty-two     0

10. 52132     thirty-four

11. 52132     nine

12. 52132     thirty

13. 52132     forty-five

14. 52132     twenty-four

15. 52009     two thousand six hundred and fifteen

16. 52132     twenty-five

17. 52090     thirteen

18. 52132     six

19. 52136     0

20. 52090     ten

21. 52024     three hundred and forty-seven

It is required to use MapReduce to count the average hits of each category of goods.

The result data are as follows:

  • Commodity classification   Average hits of goods
  • fifty-two thousand and six     four hundred and sixty-two
  • fifty-two thousand and nine     two thousand six hundred and fifteen
  • fifty-two thousand and twenty-four     three hundred and forty-seven
  • fifty-two thousand and ninety     eleven
  • fifty-two thousand and ninety-two     ninety-three
  • fifty-two thousand one hundred and nine     thirty-five
  • fifty-two thousand one hundred and twenty     ninety-three
  • fifty-two thousand one hundred and twenty-seven     five

10. 52132     twenty-three

11. 52136     0

< strong > experimental steps < / strong >

1. Switch to / apps / Hadoop / SBIN directory and start Hadoop.

  • cd /apps/hadoop/sbin  
  • ./start-all.sh  

2. Create a new / data / mapreduce4 entry on Linux.

  • mkdir -p /data/mapreduce4  

3. In Linux, switch to / data / mapreduce4 directory and use WGet command to http://192.168.1.100:60000/allfiles/mapreduce4/goods_ Click the web site to download the text file goods_ click。

  • cd /data/mapreduce4  
  • wget http://192.168.1.100:60000/allfiles/mapreduce4/goods_click  

Then use the WGet command from the current directory http://192.168.1.100:60000/allfiles/mapreduce4/hadoop2lib.tar.gz Download the dependency package used by the project from the website.

  • wget http://192.168.1.100:60000/allfiles/mapreduce4/hadoop2lib.tar.gz  

Unzip Hadoop 2lib.tar.gz to the current directory.

  • tar zxvf hadoop2lib.tar.gz  

4. First create a new / mymapreduce4 / in directory on HDFS, and then add the goods under the Linux local / data / mapreduce4 directory_ The click file is imported into the / mymapreduce4 / in directory of HDFS.

  • hadoop fs -mkdir -p /mymapreduce4/in  
  • hadoop fs -put /data/mapreduce4/goods_click /mymapreduce4/in  

5. Create a new Java project named mapreduce4.

Create a new package named MapReduce under the MapReduce 4 project.

Create a new class named myaverage under the MapReduce package.

6. Add the jar package required by the project, right-click mapreduce4, and create a new folder named Hadoop 2lib to store the jar package required by the project.

Copy the jar package in / data / mapreduce4 directory and Hadoop 2lib directory to Hadoop 2lib directory of mapreduce4 project in eclipse.

Select all jar packages in Hadoop 2lib directory and add them to build path.

7. Write java code and describe its design idea.

Mapper code

  • public static class Map extends Mapper{  
  •     private static Text newKey=new Text();  
  • // Implement map function
  •     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  • //  Convert the data of the input plain text file into string
  •     String line=value.toString();  
  •     System.out.println(line);  
  •     String arr[]=line.split(“\t”);  
  •     newKey.set(arr[0]);  
  • 10.     int click=Integer.parseInt(arr[1]);  
  • 11.     context.write(newKey, new IntWritable(click));  
  • twelve     }
  • thirteen     }

After using the default input method of Hadoop, the map side intercepts the input value value through the split () method. We convert the intercepted commodity clicks field into intwritable type and set it to value, set the commodity classification field to key, and then directly output the value of key / value.

Reducer代码

  • public static class Reduce extends Reducer{  
  • //Implement the reduce function
  • public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{  
  •     int num=0;  
  •     int count=0;  
  •     for(IntWritable val:values){  
  • num+=val.get();  // Sum num per element
  • count++;         // Count the number of elements
  • }
  • 10.     int avg=num/count;  //计算平均数  
  • eleven
  • 12.     context.write(key,new IntWritable(avg));  
  • thirteen     }
  • fourteen     }

Output of map & lt; key,value> Shuffle Process Integration & lt; key,values> Key value pairs, and then & lt; key,values> Give the key value pair to reduce. After receiving values, the reduce end copies the input key directly to the output key, sums each element in values through the for loop, counts the number of elements, then divides num by count to obtain the average value AVG, sets AVG to value, and finally outputs & lt; key,value> That’s it.

Complete code

  • package mapreduce;  
  • import java.io.IOException;  
  • import org.apache.hadoop.conf.Configuration;  
  • import org.apache.hadoop.fs.Path;  
  • import org.apache.hadoop.io.IntWritable;  
  • import org.apache.hadoop.io.NullWritable;  
  • import org.apache.hadoop.io.Text;  
  • import org.apache.hadoop.mapreduce.Job;  
  • import org.apache.hadoop.mapreduce.Mapper;  

10. import org.apache.hadoop.mapreduce.Reducer;  

11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  

13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

15. public class MyAverage{  

  • 16.     public static class Map extends Mapper{  
  • 17.     private static Text newKey=new Text();  
  • 18.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  • 19.     String line=value.toString();  
  • 20.     System.out.println(line);  
  • 21.     String arr[]=line.split(“\t”);  
  • 22.     newKey.set(arr[0]);  
  • 23.     int click=Integer.parseInt(arr[1]);  
  • 24.     context.write(newKey, new IntWritable(click));  
  • twenty-five     }
  • twenty-six     }
  • 27.     public static class Reduce extends Reducer{  
  • 28.     public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{  
  • 29.         int num=0;  
  • 30.         int count=0;  
  • 31.         for(IntWritable val:values){  
  • 32.         num+=val.get();  
  • 33.         count++;  
  • thirty-four         }
  • 35.         int avg=num/count;  
  • 36.         context.write(key,new IntWritable(avg));  
  • thirty-seven         }
  • thirty-eight         }
  • 39.         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  • 40.         Configuration conf=new Configuration();  
  • 41.         System.out.println(“start”);  
  • 42.         Job job =new Job(conf,”MyAverage”);  
  • 43.         job.setJarByClass(MyAverage.class);  
  • 44.         job.setMapperClass(Map.class);  
  • 45.         job.setReducerClass(Reduce.class);  
  • 46.         job.setOutputKeyClass(Text.class);  
  • 47.         job.setOutputValueClass(IntWritable.class);  
  • 48.         job.setInputFormatClass(TextInputFormat.class);  
  • 49.         job.setOutputFormatClass(TextOutputFormat.class);  
  • 50.         Path in=new Path(“hdfs://localhost:9000/mymapreduce4/in/goods_click”);  
  • 51.         Path out=new Path(“hdfs://localhost:9000/mymapreduce4/out”);  
  • 52.         FileInputFormat.addInputPath(job,in);  
  • 53.         FileOutputFormat.setOutputPath(job,out);  
  • 54.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  • fifty-five
  • fifty-six         }
  • fifty-seven         }

8. In the myaverage class file, right-click and click = & gt; Run As=> Run on Hadoop option to submit MapReduce task to Hadoop.

9. After execution, enter the command mode and view the experimental results in / mymapreduce4 / out on HDFS.

  • hadoop fs -ls /mymapreduce4/out  
  • hadoop fs -cat /mymapreduce4/out/part-r-00000