Word Count Hadoop
anujd64
Posted on March 13, 2024
Video version of this article: https://www.youtube.com/watch?v=wTkffAYsCBw
Credits: @UnboxingBigData
Code
Open Eclipse and create a new Java project.
Right click on project and click on Build Path > select Configure Build Path...
Click on libraries tab and click on Add External JARs...
Navigate to hadoop common folder in my case it is home/hadoop-3.3.6/share/hadoop/common
and select all the JAR files.
Again click on Add External JARs...
Navigate to hadoop mapreduce folder in my case it is home/hadoop-3.3.6/share/hadoop/mapreduce
and select all the JAR files.
Now the libraries tab will look like this:
Click apply and close.
Now create the three classes, code for the classes is given at the end of the article. I am just copy pasting the classes in the src folder of our Eclipse project.
Now we will be exporting JAR for out project, right click on the project and select export.
Select the export destination:
Select the main class for the project:
You will find the JAR at the selected location:
Create an input.txt file like so:
Now just execute the following commands one by one
cd Desktop
start-all.sh
hadoop fs -mkdir /wc_input
hadoop fs -put input.txt /wc_input
hadoop jar WordCount.jar /wc_input/input.txt /wc_output/
hadoop fs -cat /wc_output/part-00000
stop-all.sh
What do these commands do ?
- -mkdir command creates the folder wc_input in the hdfs(hadoop's distributed file system)
- -put will copy the input.txt file to the folder we just created.
- jar command uses the code in the JAR file exported to perform map reduce on the input.txt file and saves the output in wc_output directory.
- -cat command just prints out the contents of the output produced by the mapreduce operation.
- to delete a directory in hdfs use
hadoop fs -rm -r <DIRECTORY_PATH>
Your terminal should look like this:
Code for the classes:
WC_Runner.java
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WC_Runner {
public static void main(String[] args) throws IOException{
JobConf conf = new JobConf(WC_Runner.class);
conf.setJobName("WordCount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WC_Mapper.class);
conf.setCombinerClass(WC_Reducer.class);
conf.setReducerClass(WC_Reducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
JobClient.runJob(conf);
}
}
WC_Reducer.java
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class WC_Reducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,
Reporter reporter) throws IOException {
int sum=0;
while (values.hasNext()) {
sum+=values.next().get();
}
output.collect(key,new IntWritable(sum));
}
}
WC_Mapper.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class WC_Mapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,OutputCollector<Text,IntWritable> output,
Reporter reporter) throws IOException{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
Posted on March 13, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 30, 2024
November 30, 2024