Big Data with Hadoop
Practical 8:
Product Sales Count by Country using MapReduce
Objective:
Implement a MapReduce program to process sales transaction data,
extract the country for each sale, and calculate the total number of products
sold per country.
Step 1:
Input Data Preparation
Create sales_data.txt
with comma-separated fields:
2023-01-01,Laptop,1200.00,Credit Card,Alice,New York,NY,USA,2022-01-01,2023-01-01,40.71,-74.002023-01-01,Mouse,25.00,Debit Card,Bob,London,England,UK,2022-02-01,2023-01-01,51.50,-0.122023-01-02,Keyboard,75.00,PayPal,Charlie,Paris,Ile-de-France,France,2022-03-01,2023-01-02,48.85,2.352023-01-02,Monitor,300.00,Credit Card,Alice,New York,NY,USA,2022-01-01,2023-01-02,40.71,-74.002023-01-03,Headphones,50.00,Credit Card,David,Berlin,Berlin,Germany,2022-04-01,2023-01-03,52.52,13.402023-01-03,Webcam,40.00,Debit Card,Eve,London,England,UK,2022-05-01,2023-01-03,51.50,-0.12
Upload to HDFS:
hdfs dfs -mkdir /sales_inputhdfsdfs -put sales_data.txt /sales_input/sales_data.txt
Step 2: Mapper Class
import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper; public class ProductSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text country = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] parts = line.split(","); final int COUNTRY_FIELD_INDEX = 7; if (parts.length > COUNTRY_FIELD_INDEX) { String countryName = parts[COUNTRY_FIELD_INDEX].trim(); if (!countryName.isEmpty()) { country.set(countryName); context.write(country, one); } else { System.err.println("Skipping record with empty country: " + line); } } else { System.err.println("Skipping malformed record: " + line); } }}
Step 3: Reducer Class
import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer; public class ProductSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }}
Step 4: Driver Class
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser; public class ProductSalesDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: productsales <input> <output>"); System.exit(2); } Job job = Job.getInstance(conf, "Product Sales by Country"); job.setJarByClass(ProductSalesDriver.class); job.setMapperClass(ProductSalesMapper.class); job.setCombinerClass(ProductSalesReducer.class); // optional job.setReducerClass(ProductSalesReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
Step 5: Compile & Package
mkdir -p classesjavac -cp "$(hadoop classpath)" -d classes ProductSalesMapper.java ProductSalesReducer.java ProductSalesDriver.javajar-cvf productsales.jar -C classes/ .
Step 6: Run the Job on Hadoop
hdfs dfs -rm -r /sales_outputhadoopjar productsales.jar ProductSalesDriver /sales_input/sales_data.txt /sales_output
Step 7: Verify the Output
hdfs dfs -cat /sales_output/part-r-00000
Expected
Output:
France 1Germany 1UK 2USA 2
Conclusion:
You’ve successfully
built a MapReduce program to count product sales per
country. This practical reinforces:
·
Parsing specific fields from structured text
·
Basic aggregation with MapReduce
·
Using a Combiner to optimize intermediate data summation