Big Data with Hadoop

Practical 6:

Maximum Annual Electrical Consumption using MapReduce

Objective:

Implement a MapReduce program to process monthly electrical consumption data and identify the highest consumption recorded for each year. This reinforces data parsing, aggregation techniques, and MapReduce usage for real-world datasets.

Introduction:

Analyzing consumption patterns, such as electrical usage, is a common task in domains like utility companies and smart home systems. This practical will guide you to create a MapReduce job that processes a dataset with yearly and monthly electrical consumption figures and outputs the peak consumption per year.

Step 1: Prepare Development Environment

Use your existing Java project setup from previous practicals with Hadoop client dependencies included.


Step 2: Input Data Preparation

Create a sample file: electrical_consumption.txt with format: year,month,consumption

2022,01,1200
2022,02,1500
2022,03,1300
2023,01,1400
2023,02,1750
2023,03,1600
2022,04,1800
2024,01,1550
2024,02,1900
2024,03,1700

Upload to HDFS:

hdfs dfs -mkdir /electrical_input
hdfs dfs -put electrical_consumption.txt /electrical_input/electrical_consumption.txt

Step 3: Mapper Class

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class MaxElectricalConsumptionMapper 
        extends Mapper<LongWritable, Text, Text, IntWritable> {
 
    private Text year = new Text();
    private IntWritable consumption = new IntWritable();
 
    @Override
    public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
 
        String line = value.toString();
        String[] parts = line.split(",");
 
        if (parts.length == 3) {
            try {
                year.set(parts[0].trim()); // First part = year
                consumption.set(Integer.parseInt(parts[2].trim())); // Third part = consumption
                context.write(year, consumption);
            } catch (NumberFormatException e) {
                System.err.println("Skipping record with invalid consumption value: " + line);
            }
        } else {
            System.err.println("Skipping malformed record (incorrect number of parts): " + line);
        }
    }
}

Step 4: Reducer Class

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class MaxElectricalConsumptionReducer 
        extends Reducer<Text, IntWritable, Text, IntWritable> {
 
    private IntWritable result = new IntWritable();
 
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
 
        int maxConsumption = Integer.MIN_VALUE;
 
        for (IntWritable val : values) {
            maxConsumption = Math.max(maxConsumption, val.get());
        }
 
        result.set(maxConsumption);
        context.write(key, result);
    }
}

Step 5: Driver Class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class MaxElectricalConsumptionDriver {
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 
        if (otherArgs.length < 2) {
            System.err.println("Usage: maxelectricalconsumption <input> <output>");
            System.exit(2);
        }
 
        Job job = Job.getInstance(conf, "Max Electrical Consumption");
        job.setJarByClass(MaxElectricalConsumptionDriver.class);
        job.setMapperClass(MaxElectricalConsumptionMapper.class);
        job.setCombinerClass(MaxElectricalConsumptionReducer.class); // Optional
        job.setReducerClass(MaxElectricalConsumptionReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Step 6: Compile & Package

Manual compilation example:

mkdir -p classes
javac -cp "$(hadoop classpath)" -d classes MaxElectricalConsumptionMapper.java MaxElectricalConsumptionReducer.java MaxElectricalConsumptionDriver.java
jar -cvf maxconsumption.jar -C classes/ .

Step 7: Run on Hadoop

hdfs dfs -rm -r /electrical_output
hadoop jar maxconsumption.jar MaxElectricalConsumptionDriver /electrical_input/electrical_consumption.txt /electrical_output

Step 8: Verify Output

hdfs dfs -cat /electrical_output/part-r-00000

Expected Output:

2022    1800
2023    1750
2024    1900

Conclusion:

You have successfully implemented a MapReduce program to find the maximum annual electrical consumption. This practical strengthens your skills in data parsing, aggregation, and leveraging Hadoop for distributed data processing.