Big Data with Hadoop

Practical 3:

Maximum Annual Temperature using MapReduce

Objective: To implement and execute a MapReduce program to identify the maximum temperature recorded for each year from a given dataset, reinforcing the concepts of data parsing, aggregation, and custom data processing logic.

Introduction:
Finding the maximum or minimum value in a dataset is a common aggregation task in data analysis. This practical will guide you through creating a MapReduce job that processes a dataset of temperature readings, extracts the year and temperature from each record, and then determines the highest temperature for every year. This exercise will deepen your understanding of how to define custom input parsing within the Mapper and how the Reducer aggregates values based on keys.

Step-by-Step Guide:

1.     Prepare Your Development Environment:

o    Continue using your existing Java project setup from Practical 2, ensuring Hadoop client dependencies are included.

2.     Input Data Preparation:

o    Create a sample text file (e.g., temperature_data.txt) with temperature records. Each line should contain the year and temperature. For simplicity, let’s assume a format where the year is the first four characters and the temperature (an integer) is at a specific position, with 9999 indicating a missing or invalid temperature.

Example temperature_data.txt:

1900-01-01 10:00 25

1900-01-02 14:00 30

1901-07-15 11:00 28

1901-07-16 15:00 35

1900-01-03 09:00 20

1901-08-01 12:00 9999  // This is a missing temperature value

o    Upload this file to HDFS:

o    hdfs dfsmkdir /temperature_input

hdfs dfs -put temperature_data.txt /temperature_input/temperature_data.txt

3.     Crafting the Mapper Class:

o    The Mapper will parse each line, extract the year and the temperature, and emit (year, temperature) as key-value pairs. It should filter out invalid temperature readings.

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

 

    private static final int MISSING_TEMPERATURE = 9999; // Standard code for missing temp

 

    @Override

    public void map

            throws IOException, InterruptedException {

        String line = value.toString();

        String year = line.substring; // Assuming year is the first 4 characters

 

        // Extract temperature, assuming it’s at a specific position and length

        // For example, if temperature is a fixed length after the year and some spaces

        // This parsing might need adjustment based on your actual data format

        int airTemperature;

        try {

            // Let’s assume temperature is the last part of the line, space-separated

            String[] parts = line.split(" ");

            if (parts.length > 2) {

                 airTemperature = Integer.parseInt(parts[parts.length – 1].trim());

            } else {

                 // Fallback or error handling if format isn’t as expected

                 return;

            }

 

        } catch {

            // Handle cases where temperature isn’t a valid number

            return;

        }

 

        if (airTemperature != MISSING_TEMPERATURE) {

            context.write(new Text(year), new IntWritable(airTemperature));

        }

    }

}

o    Note on Parsing: The line.split(" "); and parts[parts.length – 1] is a simple parsing example. In a real scenario, you would need robust parsing logic based on the exact format of your input data.

Developing the Reducer Class:

o    The Reducer will receive (year, list_of_temperatures). It will iterate through the list to find the maximum temperature for that year and emit (year, max_temperature).

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

 

    @Override

    public void reduce(Text key, Iterable<IntWritable> values, Context context)

            throws IOException, InterruptedException {

        int maxTemperature = Integer.MIN_VALUE; // Initialize with the smallest possible integer

        for (IntWritable value : values) {

            maxTemperature = Math.max(maxTemperature, value.get());

        }

        context.write(key, new IntWritable(maxTemperature)); // Emit (year, max_temp_for_that_year)

    }

}

4.     Configuring and Submitting the MapReduce Job:

o    This class will set up the job, specify the Mapper and Reducer, and define input/output paths, similar to Practical 2.

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class MaxTemperatureDriver {

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length < 2) {

            System.err.println("Usage: maxtemperature <in> <out>");

            System.exit;

        }

 

        Job job = Job.getInstance(conf, "max temperature");

        job.setJarByClass;

        job.setMapperClass;

        job.setCombinerClass; // Optional optimization: local max

        job.setReducerClass;

        job.setOutputKeyClass;

        job.setOutputValueClass;

        FileInputFormat.addInputPath(job, new Path(otherArgs));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

o    Note on Combiner: We can optionally use the MaxTemperatureReducer as a Combiner. A Combiner runs locally on the Mapper’s output before shuffling, reducing the amount of data transferred to the Reducers. Since finding the maximum is an associative and commutative operation, the same Reducer logic can be applied as a Combiner for optimization.

5.     Compiling and Packaging Your Application:

o    Compile your Java code into a .jar file, as demonstrated in Practical 2.

o    # Example for manual compilation and JAR creation (adjust paths)

o    mkdir -p classes

o    javaccp "$(hadoop classpath)" -d classes MaxTemperatureMapper.java MaxTemperatureReducer.java MaxTemperatureDriver.java

jarcvf maxtemp.jar -C classes/ .

6.     Running the Job on Hadoop:

o    Execute the JAR file on your Hadoop cluster.

o    hdfs dfsrm -r /max_temperature_output # Remove output directory if it exists

hadoop jar maxtemp.jar MaxTemperatureDriver /temperature_input/temperature_data.txt /max_temperature_output

o    Replace maxtemp.jar with the actual name of your compiled JAR file and adjust input/output paths as necessary.

7.     Verifying the Output:

o    After the job completes successfully, view the output stored in HDFS:

hdfs dfs -cat /max_temperature_output/part-r-00000

o    You should see output similar to this (order may vary):

o    1900              30

1901              35

Conclusion:
You have successfully implemented a MapReduce program to find the maximum temperature for each year from a dataset. This practical demonstrated crucial steps in custom data processing, including parsing specific fields within the Mapper and performing aggregation logic in the Reducer. You’ve also seen how a Combiner can optimize job performance for certain types of operations.