Big Data with Hadoop
Practical
5:
Matrix
Multiplication using MapReduce
Objective:
To implement and execute a MapReduce program to
perform matrix multiplication (C = A × B), demonstrating how to handle complex
data structures, design custom Writable classes, and pass configuration
parameters within the MapReduce framework.
Introduction:
Matrix multiplication is a key operation in linear algebra and is widely used
in data science, including machine learning and graph processing. Implementing
it in a distributed environment like Hadoop MapReduce requires careful design to distribute computation
efficiently and aggregate partial results. In this practical, students will:
- Represent
matrices as input files.
- Design
custom Writable classes to manage composite keys and values.
- Implement
Mapper and Reducer logic to compute the product matrix.
Step-by-Step
Guide
1. Prepare
Your Development Environment
- Use
your existing Java project setup from previous practicals,
ensuring Hadoop client dependencies are
included.
2. Input
Data Preparation
- Multiply
two matrices, A and B, where Cij=∑kAik×BkjC_{ij} = \sum_k A_{ik} \times B_{kj}Cij=∑kAik×Bkj.
- Create
two sample text files: matrix_A.txt and matrix_B.txt. Each line represents a non-zero element in the format: matrix_tag,row_index,col_index,value.
Example matrix_A.txt (2×2):
A,0,0,1
A,0,1,2
A,1,0,3
A,1,1,4
Example matrix_B.txt (2×2):
B,0,0,5
B,0,1,6
B,1,0,7
B,1,1,8
- Upload
files to HDFS:
hdfs dfs –mkdir /matrix_input
hdfs dfs -put
matrix_A.txt /matrix_input/matrix_A.txt
hdfs dfs -put
matrix_B.txt /matrix_input/matrix_B.txt
Note: For this example, Matrix A is M×N, Matrix B is N×P. Here M=2, N=2, P=2. Pass these
dimensions to the MapReduce job.
3. Define
Custom Writable Classes
MatrixCellWritable.java (represents a cell in the result
matrix C):
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class MatrixCellWritable
implements WritableComparable<MatrixCellWritable>
{
public int row;
public int col;
public MatrixCellWritable()
{}
public MatrixCellWritable(int row, int col) { this.row = row; this.col = col; }
@Override
public void write(DataOutput
out) throws IOException {
out.writeInt(row);
out.writeInt(col);
}
@Override
public void readFields(DataInput in) throws IOException
{
row = in.readInt();
col = in.readInt();
}
@Override
public int compareTo(MatrixCellWritable
other) {
if (this.row != other.row) return Integer.compare(this.row, other.row);
return Integer.compare(this.col, other.col);
}
@Override
public boolean
equals(Object o) {
if (this == o)
return true;
if (o == null
|| getClass() != o.getClass())
return false;
MatrixCellWritable
that = (MatrixCellWritable) o;
return row == that.row && col == that.col;
}
@Override
public int hashCode() { return Objects.hash(row,
col); }
@Override
public String toString()
{ return row + "," + col; }
}
MatrixValueWritable.java (represents a matrix element for
multiplication):
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MatrixValueWritable
implements Writable {
public int index; //
k
public int value; // A_ik or B_kj
public String matrixTag;
// "A" or "B"
public MatrixValueWritable()
{}
public MatrixValueWritable(int index, int value, String matrixTag) {
this.index =
index;
this.value =
value;
this.matrixTag
= matrixTag;
}
@Override
public void write(DataOutput
out) throws IOException {
out.writeInt(index);
out.writeInt(value);
out.writeUTF(matrixTag);
}
@Override
public void readFields(DataInput in) throws IOException
{
index = in.readInt();
value = in.readInt();
matrixTag
= in.readUTF();
}
@Override
public String toString()
{ return matrixTag + "," + index +
"," + value; }
}
4. Mapper
Class
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class MatrixMultiplicationMapper
extends Mapper<LongWritable, Text, MatrixCellWritable, MatrixValueWritable>
{
private int numColsB;
private int numRowsA;
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
numColsB
= context.getConfiguration().getInt("numColsB", -1);
numRowsA
= context.getConfiguration().getInt("numRowsA", -1);
if (numColsB == -1 || numRowsA == -1)
throw new IOException("Matrix dimensions not
set.");
}
@Override
public void map(LongWritable
key, Text value, Context context) throws IOException,
InterruptedException {
String[] parts
= value.toString().split(",");
if (parts.length != 4) return;
String matrixTag
= parts[0];
int
row = Integer.parseInt(parts[1]);
int
col = Integer.parseInt(parts[2]);
int
val = Integer.parseInt(parts[3]);
if (matrixTag.equals("A"))
{
for (int j = 0; j < numColsB; j++)
context.write(new MatrixCellWritable(row, j), new MatrixValueWritable(col,
val, "A"));
} else if (matrixTag.equals("B"))
{
for (int i = 0; i < numRowsA; i++)
context.write(new MatrixCellWritable(i, col), new MatrixValueWritable(row,
val, "B"));
}
}
}
5. Reducer
Class
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class MatrixMultiplicationReducer
extends Reducer<MatrixCellWritable, MatrixValueWritable, MatrixCellWritable,
IntWritable> {
private int numColsA;
@Override
protected
void setup(Context context) throws IOException, InterruptedException {
numColsA
= context.getConfiguration().getInt("numColsA", -1);
if (numColsA == -1) throw new IOException("Matrix
dimension N (numColsA) not set.");
}
@Override
public void reduce(MatrixCellWritable
key, Iterable<MatrixValueWritable>
values, Context context) throws IOException, InterruptedException {
Map<Integer, Integer> matrixA = new HashMap<>();
Map<Integer, Integer> matrixB = new HashMap<>();
for (MatrixValueWritable val : values)
{
if (val.matrixTag.equals("A")) matrixA.put(val.index, val.value);
else if (val.matrixTag.equals("B")) matrixB.put(val.index, val.value);
}
int
dotProduct = 0;
for (int k = 0; k < numColsA; k++)
{
Integer valA
= matrixA.get(k);
Integer valB
= matrixB.get(k);
if (valA != null && valB !=
null) dotProduct += valA * valB;
}
context.write(key, new IntWritable(dotProduct));
}
}
6. Driver
Class
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MatrixMultiplicationDriver
{
public static void main(String[] args) throws Exception {
Configuration conf
= new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 5) {
System.err.println("Usage:
matrixmultiplication <in> <out> <numRowsA> <numColsA>
<numColsB>");
System.exit(2);
}
conf.setInt("numRowsA", Integer.parseInt(otherArgs[2]));
conf.setInt("numColsA", Integer.parseInt(otherArgs[3]));
conf.setInt("numColsB", Integer.parseInt(otherArgs[4]));
Job job = Job.getInstance(conf, "matrix multiplication");
job.setJarByClass(MatrixMultiplicationDriver.class);
job.setMapperClass(MatrixMultiplicationMapper.class);
job.setReducerClass(MatrixMultiplicationReducer.class);
job.setMapOutputKeyClass(MatrixCellWritable.class);
job.setMapOutputValueClass(MatrixValueWritable.class);
job.setOutputKeyClass(MatrixCellWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 :
1);
}
}
7. Compiling
and Packaging
mkdir -p classes
javac –cp
"$(hadoop classpath)"
-d classes MatrixCellWritable.java MatrixValueWritable.java
MatrixMultiplicationMapper.java MatrixMultiplicationReducer.java
MatrixMultiplicationDriver.java
jar –cvf
matrixmul.jar -C classes/ .
8. Running
the Job on Hadoop
hdfs dfs –rm -r /matrix_output
hadoop jar matrixmul.jar MatrixMultiplicationDriver /matrix_input
/matrix_output 2 2 2
Conclusion:
You have successfully implemented MapReduce-based
matrix multiplication using custom Writable classes and job-specific
parameters. This practical strengthens your understanding of distributed
computation, intermediate data structures, and designing efficient data flows.