Big Data with Hadoop

Practical 2:

 Word Frequency Count using MapReduce

Objective:
To implement and execute a MapReduce program that calculates the frequency of each word in a given text file, demonstrating the core concepts of distributed data processing.

Introduction:
The Word Count example is the "Hello World" of MapReduce programming. It effectively illustrates how the Map and Reduce functions work together to process large datasets in a distributed manner. In this practical, you will write a Java-based MapReduce application to count the occurrences of every word in an input text file. This will solidify your understanding of the MapReduce data flow, key-value pairs, and job configuration.

Step-by-Step Guide:

1. Prepare Your Development Environment:

  • Create a new Java project. If using Maven, add Hadoop client dependencies (e.g., hadoop-client, hadoop-common).
  • Ensure your JAVA_HOME and Hadoop environment variables are correctly set as per Practical 1.

2. Input Data Preparation:

  • Create a sample text file (e.g., input.txt) with some sentences, for example:

hello hadoop world

hadoop is big data

big data world

  • Upload this file to HDFS:

hdfs dfsmkdir /input

hdfs dfs -put input.txt /input/input.txt

3. Crafting the Mapper Class:

  • The Mapper takes input key-value pairs (byte offset, line of text) and emits intermediate key-value pairs (word, 1).

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class WordFrequencyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

 

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

 

    @Override

    public void map(LongWritable key, Text value, Context context)

            throws IOException, InterruptedException {

        String line = value.toString();

        StringTokenizer tokenizer = new StringTokenizer(line);

        while (tokenizer.hasMoreTokens()) {

            word.set(tokenizer.nextToken().toLowerCase()); // Convert to lowercase

            context.write(word, one); // Emit (word, 1)

        }

    }

}

4. Developing the Reducer Class:

  • The Reducer aggregates intermediate key-value pairs to emit final key-value pairs (word, total count).

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordFrequencyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

 

    private IntWritable result = new IntWritable();

 

    @Override

    public void reduce(Text key, Iterable<IntWritable> values, Context context)

            throws IOException, InterruptedException {

        int sum = 0;

        for (IntWritable val : values) {

            sum += val.get();

        }

        result.set(sum);

        context.write(key, result); // Emit (word, total_count)

    }

}

5. Configuring and Submitting the MapReduce Job:

  • The Driver class sets up the job configuration, specifies Mapper and Reducer classes, and handles input/output paths.

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class WordFrequencyDriver {

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length < 2) {

            System.err.println("Usage: wordfrequency <in> <out>");

            System.exit(2);

        }

 

        Job job = Job.getInstance(conf, "word frequency count");

        job.setJarByClass(WordFrequencyDriver.class);

        job.setMapperClass(WordFrequencyMapper.class);

        job.setCombinerClass(WordFrequencyReducer.class); // Optional

        job.setReducerClass(WordFrequencyReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // Input path on HDFS

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // Output path on HDFS

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

6. Compiling and Packaging Your Application:

  • Compile your Java code into a .jar file.
  • If using Maven: mvn clean package
  • Otherwise, manually:

mkdir -p classes

javaccp "$(hadoop classpath)" -d classes WordFrequencyMapper.java WordFrequencyReducer.java WordFrequencyDriver.java

jarcvf wordfrequency.jar -C classes/ .

7. Running the Job on Hadoop:

  • Execute the JAR file on Hadoop. Remove any previous output directory first:

hdfs dfsrm -r /output

hadoop jar wordfrequency.jar WordFrequencyDriver /input/input.txt /output

8. Verifying the Output:

  • View the output stored in HDFS:

hdfs dfs -cat /output/part-r-00000

Conclusion:
You have successfully implemented, compiled, and run your first MapReduce application to calculate word frequencies. This practical has demonstrated the end-to-end process of developing a distributed data processing job, from writing Mapper and Reducer logic to deploying and verifying results on a Hadoop cluster. This foundational understanding is critical for tackling more complex Big Data problems.