Big Data with Hadoop

Practical 9:

Movie Tag Analysis using MapReduce

Objective:

Implement a MapReduce program to process the MovieLens tags.csv dataset, extract all user-generated tags for each movie, and output a consolidated list of tags for every movie.


Step 1: Input Data Preparation

Sample tags.csv (header included):

userId,movieId,tag,timestamp
1,1,toy story,964981247
1,3,grumpy,964981247
2,1,pixar,964981247
2,3,comedic,964981247
6,1,animation,964981247

Upload to HDFS:

hdfs dfs -mkdir /movielens_input
hdfs dfs -put tags.csv /movielens_input/tags.csv

Step 2: Mapper Class

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class MovieTagMapper extends Mapper<LongWritable, Text, Text, Text> {
 
    private boolean skipHeader = true;
 
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
 
        String line = value.toString();
 
        // Skip header
        if (skipHeader) {
            skipHeader = false;
            return;
        }
 
        String[] parts = line.split(",", 4); // Limit to 4 splits to handle tags with commas
        if (parts.length >= 3) {
            String movieId = parts[1].trim(); // Index 1 is movieId
            String tag = parts[2].trim();     // Index 2 is tag
 
            if (!movieId.isEmpty() && !tag.isEmpty()) {
                context.write(new Text(movieId), new Text(tag));
            } else {
                System.err.println("Skipping record with empty movieId or tag: " + line);
            }
        } else {
            System.err.println("Skipping malformed record: " + line);
        }
    }
}

Step 3: Reducer Class

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class MovieTagReducer extends Reducer<Text, Text, Text, Text> {
 
    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
 
        Set<String> uniqueTags = new HashSet<>();
        for (Text tag : values) {
            uniqueTags.add(tag.toString());
        }
 
        StringBuilder sb = new StringBuilder();
        boolean first = true;
        for (String tag : uniqueTags) {
            if (!first) sb.append(", ");
            sb.append(tag);
            first = false;
        }
 
        context.write(key, new Text(sb.toString()));
    }
}

Step 4: Driver Class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class MovieTagDriver {
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 
        if (otherArgs.length < 2) {
            System.err.println("Usage: movietags <input> <output>");
            System.exit(2);
        }
 
        Job job = Job.getInstance(conf, "Movie Tags Analysis");
        job.setJarByClass(MovieTagDriver.class);
        job.setMapperClass(MovieTagMapper.class);
        job.setReducerClass(MovieTagReducer.class);
 
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
 
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Step 5: Compile & Package

mkdir -p classes
javac -cp "$(hadoop classpath)" -d classes MovieTagMapper.java MovieTagReducer.java MovieTagDriver.java
jar -cvf movietags.jar -C classes/ .

Step 6: Run the Job on Hadoop

hdfs dfs -rm -r /movielens_output
hadoop jar movietags.jar MovieTagDriver /movielens_input/tags.csv /movielens_output

Step 7: Verify the Output

hdfs dfs -cat /movielens_output/part-r-00000

Expected Output:

1             animation, toy story, pixar
3             comedic, grumpy

Note: Order of tags may vary due to HashSet.


Conclusion:

You’ve successfully written a MapReduce program to consolidate user-generated tags for movies. This practical demonstrates:

·         Parsing CSV data

·         Skipping headers

·         Aggregating text values per key

·         Preparing data features for recommender systems