Big Data with Hadoop

Practical 5:

Matrix Multiplication using MapReduce

Objective:
To implement and execute a MapReduce program to perform matrix multiplication (C = A × B), demonstrating how to handle complex data structures, design custom Writable classes, and pass configuration parameters within the MapReduce framework.

Introduction:
Matrix multiplication is a key operation in linear algebra and is widely used in data science, including machine learning and graph processing. Implementing it in a distributed environment like Hadoop MapReduce requires careful design to distribute computation efficiently and aggregate partial results. In this practical, students will:

  • Represent matrices as input files.
  • Design custom Writable classes to manage composite keys and values.
  • Implement Mapper and Reducer logic to compute the product matrix.

Step-by-Step Guide

1. Prepare Your Development Environment

  • Use your existing Java project setup from previous practicals, ensuring Hadoop client dependencies are included.

2. Input Data Preparation

  • Multiply two matrices, A and B, where Cij=∑kAik×BkjC_{ij} = \sum_k A_{ik} \times B_{kj}Cij=∑kAik×Bkj.
  • Create two sample text files: matrix_A.txt and matrix_B.txt. Each line represents a non-zero element in the format: matrix_tag,row_index,col_index,value.

Example matrix_A.txt (2×2):

A,0,0,1

A,0,1,2

A,1,0,3

A,1,1,4

Example matrix_B.txt (2×2):

B,0,0,5

B,0,1,6

B,1,0,7

B,1,1,8

  • Upload files to HDFS:

hdfs dfsmkdir /matrix_input

hdfs dfs -put matrix_A.txt /matrix_input/matrix_A.txt

hdfs dfs -put matrix_B.txt /matrix_input/matrix_B.txt

Note: For this example, Matrix A is M×N, Matrix B is N×P. Here M=2, N=2, P=2. Pass these dimensions to the MapReduce job.


3. Define Custom Writable Classes

MatrixCellWritable.java (represents a cell in the result matrix C):

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.util.Objects;

 

public class MatrixCellWritable implements WritableComparable<MatrixCellWritable> {

    public int row;

    public int col;

 

    public MatrixCellWritable() {}

    public MatrixCellWritable(int row, int col) { this.row = row; this.col = col; }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(row);

        out.writeInt(col);

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        row = in.readInt();

        col = in.readInt();

    }

 

    @Override

    public int compareTo(MatrixCellWritable other) {

        if (this.row != other.row) return Integer.compare(this.row, other.row);

        return Integer.compare(this.col, other.col);

    }

 

    @Override

    public boolean equals(Object o) {

        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        MatrixCellWritable that = (MatrixCellWritable) o;

        return row == that.row && col == that.col;

    }

 

    @Override

    public int hashCode() { return Objects.hash(row, col); }

 

    @Override

    public String toString() { return row + "," + col; }

}

MatrixValueWritable.java (represents a matrix element for multiplication):

import org.apache.hadoop.io.Writable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

 

public class MatrixValueWritable implements Writable {

    public int index; // k

    public int value; // A_ik or B_kj

    public String matrixTag; // "A" or "B"

 

    public MatrixValueWritable() {}

    public MatrixValueWritable(int index, int value, String matrixTag) {

        this.index = index;

        this.value = value;

        this.matrixTag = matrixTag;

    }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(index);

        out.writeInt(value);

        out.writeUTF(matrixTag);

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        index = in.readInt();

        value = in.readInt();

        matrixTag = in.readUTF();

    }

 

    @Override

    public String toString() { return matrixTag + "," + index + "," + value; }

}


4. Mapper Class

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.mapreduce.Mapper;

 

public class MatrixMultiplicationMapper extends Mapper<LongWritable, Text, MatrixCellWritable, MatrixValueWritable> {

 

    private int numColsB;

    private int numRowsA;

 

    @Override

    protected void setup(Context context) throws IOException, InterruptedException {

        numColsB = context.getConfiguration().getInt("numColsB", -1);

        numRowsA = context.getConfiguration().getInt("numRowsA", -1);

        if (numColsB == -1 || numRowsA == -1) throw new IOException("Matrix dimensions not set.");

    }

 

    @Override

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String[] parts = value.toString().split(",");

        if (parts.length != 4) return;

 

        String matrixTag = parts[0];

        int row = Integer.parseInt(parts[1]);

        int col = Integer.parseInt(parts[2]);

        int val = Integer.parseInt(parts[3]);

 

        if (matrixTag.equals("A")) {

            for (int j = 0; j < numColsB; j++)

                context.write(new MatrixCellWritable(row, j), new MatrixValueWritable(col, val, "A"));

        } else if (matrixTag.equals("B")) {

            for (int i = 0; i < numRowsA; i++)

                context.write(new MatrixCellWritable(i, col), new MatrixValueWritable(row, val, "B"));

        }

    }

}


5. Reducer Class

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Reducer;

 

public class MatrixMultiplicationReducer extends Reducer<MatrixCellWritable, MatrixValueWritable, MatrixCellWritable, IntWritable> {

 

    private int numColsA;

 

    @Override

    protected void setup(Context context) throws IOException, InterruptedException {

        numColsA = context.getConfiguration().getInt("numColsA", -1);

        if (numColsA == -1) throw new IOException("Matrix dimension N (numColsA) not set.");

    }

 

    @Override

    public void reduce(MatrixCellWritable key, Iterable<MatrixValueWritable> values, Context context) throws IOException, InterruptedException {

        Map<Integer, Integer> matrixA = new HashMap<>();

        Map<Integer, Integer> matrixB = new HashMap<>();

 

        for (MatrixValueWritable val : values) {

            if (val.matrixTag.equals("A")) matrixA.put(val.index, val.value);

            else if (val.matrixTag.equals("B")) matrixB.put(val.index, val.value);

        }

 

        int dotProduct = 0;

        for (int k = 0; k < numColsA; k++) {

            Integer valA = matrixA.get(k);

            Integer valB = matrixB.get(k);

            if (valA != null && valB != null) dotProduct += valA * valB;

        }

        context.write(key, new IntWritable(dotProduct));

    }

}


6. Driver Class

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

public class MatrixMultiplicationDriver {

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length < 5) {

            System.err.println("Usage: matrixmultiplication <in> <out> <numRowsA> <numColsA> <numColsB>");

            System.exit(2);

        }

 

        conf.setInt("numRowsA", Integer.parseInt(otherArgs[2]));

        conf.setInt("numColsA", Integer.parseInt(otherArgs[3]));

        conf.setInt("numColsB", Integer.parseInt(otherArgs[4]));

 

        Job job = Job.getInstance(conf, "matrix multiplication");

        job.setJarByClass(MatrixMultiplicationDriver.class);

        job.setMapperClass(MatrixMultiplicationMapper.class);

        job.setReducerClass(MatrixMultiplicationReducer.class);

 

        job.setMapOutputKeyClass(MatrixCellWritable.class);

        job.setMapOutputValueClass(MatrixValueWritable.class);

 

        job.setOutputKeyClass(MatrixCellWritable.class);

        job.setOutputValueClass(IntWritable.class);

 

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

 

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


7. Compiling and Packaging

mkdir -p classes

javaccp "$(hadoop classpath)" -d classes MatrixCellWritable.java MatrixValueWritable.java MatrixMultiplicationMapper.java MatrixMultiplicationReducer.java MatrixMultiplicationDriver.java

jarcvf matrixmul.jar -C classes/ .


8. Running the Job on Hadoop

hdfs dfsrm -r /matrix_output

hadoop jar matrixmul.jar MatrixMultiplicationDriver /matrix_input /matrix_output 2 2 2

 

Conclusion:
You have successfully implemented MapReduce-based matrix multiplication using custom Writable classes and job-specific parameters. This practical strengthens your understanding of distributed computation, intermediate data structures, and designing efficient data flows.