Sunday, December 8, 2013

Simple MapReduce Program for Hadoop

Overview

Previous post we went through basics of big data processing and Hadoop. This post I describe some basic map reduce program structure. Here I have used example of counting words in large number of books. I have configured a two node cluster with distributed file system. Here we are using three classes as,
  1. Mapper class 
  2. Reducer Class 
  3. Main words Count Class 
Assume I have 10 documents having only words “hello” and “world”. Documents will be shared by two nodes. How much documents for each node will be decide by Hadoop framework.
This is our map reduce architecture from previous post.
High-level Map Reduce Architecture.
Mapper Class

Mapper will check each word and add previous count+1 and store as key pair values. Each node Mapper will run separately. So after the Mapper operation

Node 1-word hello-23
Node 1-word world-13
Node 2-word hello-7
Node 2-word world-14

public class WordCountMapper extends MapReduceBase implements
            Mapper<Text, Text, Text, IntWritable> {

      private final IntWritable one = new IntWritable(1);
      private Text word = new Text();

      @Override
      public void map(Text arg0, Text arg1,
                  OutputCollector<Text, IntWritable> arg2, Reporter arg3)
                  throws IOException {
            String line = arg1.toString();
            StringTokenizer itr = new StringTokenizer(line.toLowerCase());
            while (itr.hasMoreTokens()) {
                  word.set(itr.nextToken());
                  arg2.collect(word, one);

            }
      }
}

Reducer Class

Reducer will go to each node and reduce the same word. So after reducer operation you can get word count as one output.

Word hello- 30
Word world- 27

public class WordCountReducer extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable> {

      @Override
      public void reduce(Text arg0, Iterator<IntWritable> arg1,
                  OutputCollector<Text, IntWritable> arg2, Reporter arg3)
                  throws IOException {
            int sum = 0;
            while (arg1.hasNext()) {
                  IntWritable value = (IntWritable) arg1.next();
                  sum += value.get(); // process value

            }
            arg2.collect(arg0, new IntWritable(sum));
            System.out.println(arg0+"="+sum);
      }
}

Main Words Count class

We also need to set input and output directories of distributed file system. Some configurations has to be done before running Mappers and reducers.
      
public class WordCount {

      public static void main(String[] args) {
            JobClient client = new JobClient();
            JobConf conf = new JobConf(WordCount.class);

            try {
                  FileSystem fs = FileSystem.get(conf);
                  Path filenamePath = new Path("Out3");
                  if (fs.exists(filenamePath)) {
                        // remove the file first
                        fs.delete(filenamePath);
                  }
            } catch (Exception e) {
                  e.printStackTrace();
            }
            // specify input types
      //    conf.setInputFormat(CustomFileInputFormat.class);
     
            // specify output types
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);

            // specify input and output dirs
            FileInputFormat.addInputPath(conf, new Path("In"));
            FileOutputFormat.setOutputPath(conf, new Path("Out3"));
            // specify a mapper
            conf.setMapperClass(WordCountMapper.class);

            // specify a reducer
            conf.setReducerClass(WordCountReducer.class);
            conf.setCombinerClass(WordCountReducer.class);

            client.setConf(conf);
            try {
                  JobClient.runJob(conf);
            } catch (Exception e) {
                  e.printStackTrace();
            }
      }
}



No comments :

Post a Comment