[MapReduce] WordCount 代码实例及具体执行过程说明

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
        public WordCount(){}

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration();
                // 调用格式 hadoop  jar  .jar   <input dir path 1 >   <input dir path 2 >  ...  <output dir path>
                // 输入可能有多个文件夹
                String[] otherArgs =  (new GenericOptionsParser(conf,args)).getRemainingArgs();

                if(otherArgs.length < 2) {
                        System.err.println("Usage: wordcount <in> [<in>...] <out>");
                        System.exit(2);
                }
                // 创建一个job 实例
                Job job = Job.getInstance(conf, "word count");
                job.setJarByClass(WordCount.class);

                // 指定继承了Mapper并实现了map方法的类
                job.setMapperClass(WordCount.TokenizerMapper.class);

                // 指定合并操作(实际上就是本地执行的Reduce操作), 可选
                job.setCombinerClass(WordCount.TokenizerReducer.class);

                //  指定分区操作 Map在根据分区类将不同的key映射到相应的分区 默认根据key值,哈希分区
                // 需要该分区的Reduce 会根据JobTracker 对Map的监控 ,当Map结束后到相应的分区通过http取数据
                // 可选 ,默认哈希分区
                // job.setPartitionerClass(xxx.class);

                // 排序将根据Key数据类型的内部Comparator方法自动排序

                //  指定分组条件, 满足同样条件的key值将被分到同一个组,排序在最前的key值 将作为该组的key
                //  个人理解是reduce端拉取数据后的归并操作 从 <key,value1>, <key, value2 >... => <key, value-list>
                // 默认key 完全相同为一组
                // job.setOutputKeyComparatorClass(XXX.class);

                // 指定Reduce操作
                job.setReducerClass(WordCount.TokenizerReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                for(int i =0 ;i <otherArgs.length-1;i++){
                        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
                }
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
                System.exit(job.waitForCompletion(true)?0:1);

        }

        public  static  class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
                private final  IntWritable one = new IntWritable(1);
                private Text word = new Text();
                public TokenizerMapper(){}
                public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                        String[] words = value.toString().split(" ");
                        for(String word_tmp : words){
                                this.word.set(word_tmp);
                                context.write(word, one);
                        }

                }
        }

        public static class TokenizerReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
                private IntWritable result = new IntWritable(0);
                public TokenizerReducer(){}
                public void reduce(Text key, Iterable<IntWritable> values,  Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                        int sum = 0;
                        for(IntWritable val: values){
                                sum+=val.get();
                        }
                        result.set(sum);
                        context.write(key, result);
                }

        }

}

Hadoop的shuffle过程就是从map端输出到reduce端输入之间的过程

Map端

Map端输出要经过分区->排序->(合并->) 然后各分区Merge到一起 等待Reduce 取走各分区的数据

(1)分区 Partition

默认分区是哈希分区,但存在数据倾斜问题,即有些key值的数据非常多,因此会影响Map效率,因此可以通过自定义分区类平衡数据分布。 哪个key到哪个Reducer的分配过程,是由Partitioner规定的 。也就是说一个map其本地有着多个分区的数据,不同分区的数据会被对应不同的reducer拉取。

(2)排序 Sort

默认根据key的数据类型的内置Comparator排序,自己实现的数据类型需要自定义Comparator函数

(3)合并 Combine

map操作后会产生大量的<key,value>键值对,并且可能存在重复的键值对,而这会影响传递数据的效率(copy过程),因此 在map端可以通过本地的合并操作(可以看作本地的一次reduce), 合并同样key的键值对

Reduce端

(1)Reduce端的copy过程

reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据。

(2) 排序 分组(归并)

reduce端从不同的map拉取的数据,数据肯定需要经过再一次排序才能保证其有效性。由于分区策略的原因,同一分区内的key值可能不同或不满足我们处理数据的需求, 因此我们需要对数据进行分组,我个人理解为其实就是大数据技术原理课程讲的归并操作,即相同key值或满足相同条件的key值 合并为一个<key, value-list>,其中key值为排序的第一个key值

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章