Previous post we went through basics of big data processing and Hadoop. This post I describe some basic map reduce program structure. Here I have used example of counting words in large number of books. I have configured a two node cluster with distributed file system. Here we are using three classes as,
- Mapper class
- Reducer Class
- Main words Count Class
This is our map reduce architecture from previous post.
High-level Map Reduce
Architecture.
Mapper ClassMapper will check each word and add previous count+1 and store as key pair values. Each node Mapper will run separately. So after the Mapper operation
Node 1-word hello-23
Node 1-word world-13
Node 2-word hello-7
Node 2-word world-14
public class
WordCountMapper extends MapReduceBase implements
Mapper<Text, Text, Text,
IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Text arg0, Text arg1,
OutputCollector<Text,
IntWritable> arg2, Reporter arg3)
throws
IOException {
String line = arg1.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
arg2.collect(word, one);
}
}
}
Reducer will go to each node and reduce the same word. So after reducer operation you can get word count as one output.
Word hello- 30
Word world- 27
public class
WordCountReducer extends MapReduceBase implements
Reducer<Text, IntWritable, Text,
IntWritable> {
@Override
public void reduce(Text arg0, Iterator<IntWritable> arg1,
OutputCollector<Text,
IntWritable> arg2, Reporter arg3)
throws
IOException {
int sum = 0;
while (arg1.hasNext()) {
IntWritable value =
(IntWritable) arg1.next();
sum += value.get(); // process value
}
arg2.collect(arg0, new
IntWritable(sum));
System.out.println(arg0+"="+sum);
}
}
Main Words Count classWe also need to set input and output directories of distributed file system. Some configurations has to be done before running Mappers and reducers.
public class WordCount {
public static void main(String[] args) {
JobClient client = new
JobClient();
JobConf conf = new
JobConf(WordCount.class);
try {
FileSystem fs = FileSystem.get(conf);
Path filenamePath = new Path("Out3");
if
(fs.exists(filenamePath)) {
// remove the file first
fs.delete(filenamePath);
}
} catch (Exception e) {
e.printStackTrace();
}
//
specify input types
// conf.setInputFormat(CustomFileInputFormat.class);
//
specify output types
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
//
specify input and output dirs
FileInputFormat.addInputPath(conf,
new Path("In"));
FileOutputFormat.setOutputPath(conf,
new Path("Out3"));
//
specify a mapper
conf.setMapperClass(WordCountMapper.class);
//
specify a reducer
conf.setReducerClass(WordCountReducer.class);
conf.setCombinerClass(WordCountReducer.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
No comments :
Post a Comment