Big Data with Hadoop

Practical 8:

Product Sales Count by Country using MapReduce

Objective:

Implement a MapReduce program to process sales transaction data, extract the country for each sale, and calculate the total number of products sold per country.


Step 1: Input Data Preparation

Create sales_data.txt with comma-separated fields:

2023-01-01,Laptop,1200.00,Credit Card,Alice,New York,NY,USA,2022-01-01,2023-01-01,40.71,-74.00
2023-01-01,Mouse,25.00,Debit Card,Bob,London,England,UK,2022-02-01,2023-01-01,51.50,-0.12
2023-01-02,Keyboard,75.00,PayPal,Charlie,Paris,Ile-de-France,France,2022-03-01,2023-01-02,48.85,2.35
2023-01-02,Monitor,300.00,Credit Card,Alice,New York,NY,USA,2022-01-01,2023-01-02,40.71,-74.00
2023-01-03,Headphones,50.00,Credit Card,David,Berlin,Berlin,Germany,2022-04-01,2023-01-03,52.52,13.40
2023-01-03,Webcam,40.00,Debit Card,Eve,London,England,UK,2022-05-01,2023-01-03,51.50,-0.12

Upload to HDFS:

hdfs dfs -mkdir /sales_input
hdfs dfs -put sales_data.txt /sales_input/sales_data.txt

Step 2: Mapper Class

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class ProductSalesMapper 
        extends Mapper<LongWritable, Text, Text, IntWritable> {
 
    private final static IntWritable one = new IntWritable(1);
    private Text country = new Text();
 
    @Override
    public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
 
        String line = value.toString();
        String[] parts = line.split(",");
 
        final int COUNTRY_FIELD_INDEX = 7;
 
        if (parts.length > COUNTRY_FIELD_INDEX) {
            String countryName = parts[COUNTRY_FIELD_INDEX].trim();
            if (!countryName.isEmpty()) {
                country.set(countryName);
                context.write(country, one);
            } else {
                System.err.println("Skipping record with empty country: " + line);
            }
        } else {
            System.err.println("Skipping malformed record: " + line);
        }
    }
}

Step 3: Reducer Class

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class ProductSalesReducer 
        extends Reducer<Text, IntWritable, Text, IntWritable> {
 
    private IntWritable result = new IntWritable();
 
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException {
 
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

Step 4: Driver Class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class ProductSalesDriver {
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 
        if (otherArgs.length < 2) {
            System.err.println("Usage: productsales <input> <output>");
            System.exit(2);
        }
 
        Job job = Job.getInstance(conf, "Product Sales by Country");
        job.setJarByClass(ProductSalesDriver.class);
        job.setMapperClass(ProductSalesMapper.class);
        job.setCombinerClass(ProductSalesReducer.class); // optional
        job.setReducerClass(ProductSalesReducer.class);
 
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Step 5: Compile & Package

mkdir -p classes
javac -cp "$(hadoop classpath)" -d classes ProductSalesMapper.java ProductSalesReducer.java ProductSalesDriver.java
jar -cvf productsales.jar -C classes/ .

Step 6: Run the Job on Hadoop

hdfs dfs -rm -r /sales_output
hadoop jar productsales.jar ProductSalesDriver /sales_input/sales_data.txt /sales_output

Step 7: Verify the Output

hdfs dfs -cat /sales_output/part-r-00000

Expected Output:

France      1
Germany     1
UK        2
USA       2

Conclusion:

You’ve successfully built a MapReduce program to count product sales per country. This practical reinforces:

·         Parsing specific fields from structured text

·         Basic aggregation with MapReduce

·         Using a Combiner to optimize intermediate data summation