Big Data with Hadoop
Practical 3:
Maximum Annual Temperature using MapReduce
Objective: To implement and execute a MapReduce program to identify the maximum temperature
recorded for each year from a given dataset, reinforcing the concepts of data
parsing, aggregation, and custom data processing logic.
Introduction:
Finding the maximum or minimum value in a dataset is a common aggregation task
in data analysis. This practical will guide you through creating a MapReduce job that processes a dataset of temperature
readings, extracts the year and temperature from each record, and then
determines the highest temperature for every year. This exercise will deepen
your understanding of how to define custom input parsing within the Mapper and
how the Reducer aggregates values based on keys.
Step-by-Step Guide:
1.
Prepare Your Development Environment:
o
Continue using your existing Java project setup from Practical 2,
ensuring Hadoop client dependencies are included.
2.
Input Data Preparation:
o
Create a sample text file (e.g., temperature_data.txt) with temperature records. Each line should
contain the year and temperature. For simplicity, let’s assume a format where
the year is the first four characters and the temperature (an integer) is at a
specific position, with 9999 indicating a missing or invalid
temperature.
Example temperature_data.txt:
1900-01-01 10:00 25
1900-01-02 14:00 30
1901-07-15 11:00 28
1901-07-16 15:00 35
1900-01-03 09:00 20
1901-08-01 12:00 9999 // This is a missing temperature value
o
Upload this file to HDFS:
o hdfs dfs –mkdir /temperature_input
hdfs dfs -put
temperature_data.txt /temperature_input/temperature_data.txt
3.
Crafting the Mapper Class:
o
The Mapper will parse each line, extract
the year and the temperature, and emit (year, temperature) as key-value pairs. It should filter out invalid
temperature readings.
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int
MISSING_TEMPERATURE = 9999; // Standard code for missing temp
@Override
public void map
throws IOException, InterruptedException
{
String line = value.toString();
String year = line.substring;
// Assuming year is the first 4 characters
// Extract temperature, assuming it’s
at a specific position and length
// For
example, if temperature is a fixed length after the year and some spaces
// This
parsing might need adjustment based on your actual data format
int
airTemperature;
try {
// Let’s
assume temperature is the last part of the line, space-separated
String[]
parts = line.split(" ");
if (parts.length > 2) {
airTemperature = Integer.parseInt(parts[parts.length – 1].trim());
} else {
// Fallback
or error handling if format isn’t as expected
return;
}
} catch {
// Handle cases where temperature
isn’t a valid number
return;
}
if (airTemperature != MISSING_TEMPERATURE) {
context.write(new
Text(year), new IntWritable(airTemperature));
}
}
}
o
Note on Parsing: The line.split(" "); and parts[parts.length
– 1] is a simple
parsing example. In a real scenario, you would need robust parsing logic based
on the exact format of your input data.
Developing the Reducer Class:
o
The Reducer will receive (year, list_of_temperatures). It will iterate through the
list to find the maximum temperature for that year and emit (year, max_temperature).
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable>
values, Context context)
throws IOException, InterruptedException
{
int
maxTemperature = Integer.MIN_VALUE;
// Initialize with the smallest possible integer
for (IntWritable value : values) {
maxTemperature = Math.max(maxTemperature, value.get());
}
context.write(key, new IntWritable(maxTemperature)); // Emit (year, max_temp_for_that_year)
}
}
4.
Configuring and Submitting the MapReduce Job:
o
This class will set up the job, specify the Mapper and Reducer, and
define input/output paths, similar to Practical 2.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MaxTemperatureDriver
{
public static void main(String[] args) throws Exception {
Configuration conf
= new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage:
maxtemperature <in> <out>");
System.exit;
}
Job job = Job.getInstance(conf, "max temperature");
job.setJarByClass;
job.setMapperClass;
job.setCombinerClass;
// Optional optimization: local max
job.setReducerClass;
job.setOutputKeyClass;
job.setOutputValueClass;
FileInputFormat.addInputPath(job, new Path(otherArgs));
FileOutputFormat.setOutputPath(job, new Path(otherArgs));
System.exit(job.waitForCompletion(true) ? 0 :
1);
}
}
o
Note on Combiner: We can optionally use the MaxTemperatureReducer as a Combiner. A Combiner runs locally on the
Mapper’s output before shuffling, reducing the amount of data transferred to
the Reducers. Since finding the maximum is an associative and commutative
operation, the same Reducer logic can be applied as a Combiner for
optimization.
5.
Compiling and Packaging Your Application:
o
Compile your Java code into a .jar file, as demonstrated in Practical 2.
o # Example for manual compilation and
JAR creation (adjust paths)
o mkdir -p classes
o javac –cp
"$(hadoop classpath)"
-d classes MaxTemperatureMapper.java MaxTemperatureReducer.java
MaxTemperatureDriver.java
jar –cvf
maxtemp.jar -C classes/ .
6.
Running the Job on Hadoop:
o
Execute the JAR file on your Hadoop cluster.
o hdfs dfs –rm -r /max_temperature_output #
Remove output directory if it exists
hadoop jar maxtemp.jar MaxTemperatureDriver
/temperature_input/temperature_data.txt /max_temperature_output
o
Replace maxtemp.jar with the actual name of your
compiled JAR file and adjust input/output paths as necessary.
7.
Verifying the Output:
o
After the job completes successfully, view the output stored in HDFS:
hdfs dfs -cat /max_temperature_output/part-r-00000
o
You should see output similar to this (order may vary):
o 1900 30
1901 35
Conclusion:
You have successfully implemented a MapReduce program
to find the maximum temperature for each year from a dataset. This practical demonstrated crucial steps in custom data
processing, including parsing specific fields within the Mapper and performing
aggregation logic in the Reducer. You’ve also seen how a Combiner can optimize
job performance for certain types of operations.