Implementing Join in Hadoop Map-Reduce

Implementing Join in Hadoop Map-Reduce

Recently while working on a project at BluePi we encountered a situation where we needed something like a join in a Map-Reduce job running on Hadoop. In this post I am going to talk about how we can implement a Join in Hadoop Map-Reduce. There may be a lot of scenarios in real life applications where we might need to implement Joins when working on applications which rely on Hadoop’s Map-Reduce framework.

Lets define the problem statement first

There are two Data sources namely ds1 and ds2. ds1 is represented by data1.txt which contains multiple columns separated by comma. We are going to deal with just two columns (groupkey and status1) This is how a record in data1.txt looks like groupkey,x1,x2,x3,x4,status1
1,"some value","some value","some value","some value","Ready"
ds2 is another csv datastore contained in data2.txt which also contains multiple columns separated by comma. we are going to deal with two coumns (groupkey and status2). A sample record would look like

1,"some value","some value","some value","some value","Pending"

Our aim is to get the output data source containing status1 and status2 columns corresponding to the same groupkey. Let’s define various components that would allow us to achieve our objective. We will be using a Maven project to demonstrate the code, so lets create a maven project first and add following dependecny to the pom.xml Implementing Join in Hadoop Map-Reduce Now lets write some code, firstly we need to define a Mapper class and implement the map method inside it

public class Mapper1 extends MapReduceBase implements
Mapper {
private String commonkey, status1, fileTag = "s1~";
public void map(LongWritable key, Text value,OutputCollector output, Reporter reporter) throws IOException { // taking one line/record at a time and parsing them into key value pairs String values[] = value.toString().split(","); commonkey = values[0].trim(); status1 = values[4].trim(); // sending the key value pair out of mapper output.collect(new Text(commonkey), new Text(fileTag + status1)); }

map method defined above processes data1.txt and frames the initial key value pairs Key(Text) – commonkey Value(Text) – An identifier to indicate the source of input(using s1’ for the “file1″ file) + status1
Define another Mapper which does similar stuff on the other datasource namely ds2

public class Mapper2 extends MapReduceBase implements
Mapper  {
// variables to process delivery report
private String commonkey, status2, fileTag = "s2~";
public void map(LongWritable key, Text value,OutputCollector[Text, Text] output, Reporter reporter) throws IOException { String line = value.toString(); String values[] = line.split(","); commonkey = values[3].trim(); status2 = values[23].trim(); output.collect(new Text(commonkey), new Text(fileTag + status2)); }

Its time to implement a Reducer now,

 public class StatusReducer extends MapReduceBase implements Reducer[Text, Text, Text, Text] {
// Variables to aid the join process
private String status1, status2;
public void reduce(Text key, Iterator values,OutputCollector; output, Reporter reporter) throws IOException { while (values.hasNext()) { String currValue =; String splitVals[] = currValue.split("~"); /* * identifying the record source that corresponds to a commonkey and * parses the values accordingly */ if (splitVals[0].equals("s1")) { status1 = splitVals[1] != null ? splitVals[1].trim(): "status1"; } else if (splitVals[0].equals("s2")) { // getting the file2 and using the same to obtain the Message status2 = splitVals[2] != null ? splitVals[2].trim(): "status2"; } output.collect(new Text(status1), new Text(status2)); }

For running the Hadoop join we need to write a class which becomes the job runner, we need to configure various parameters like Mapper, Reducer etc in the Job Configuration in order to run it correctly, following configurations need to be set up

public class Executor extends Configured implements Tool { public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), Excecutor.class); conf.setJobName("SMS Reports"); // setting key value types for mapper and reducer outputs conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); // specifying the custom reducer class conf.setReducerClass(StatusReducer.class); // Specifying the input directories(@ runtime) and Mappers independently // for inputs from multiple sources MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, Mapper1.class); MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, Mapper2.class); // Specifying the output directory @ runtime FileOutputFormat.setOutputPath(conf, new Path(args[2])); JobClient.runJob(conf); }
public static void main(String[] args) throws Exception { int res = Configuration(), new Excecutor(),args); System.exit(res); }

The next step is now to create a jar file, which simply could be done with following command run at the root of your directory from command line

 mvn clean package 

This creates the desired jar file(lets call it mapreduce-join-example.jar) in the target folder of your project, transfer the jar to the Master Node of the Hadoop cluster. Also we need to copy the files to be processed on the HDFS before we run the job, this could be done with following command

 hadoop dfs -copyFromLocal /home/ds1/data1.txt /hdfs_home/data1
hadoop dfs -copyFromLocal /home/ds2/data2.txt /hdfs_home/data2 

Once files are copied we can run the jar file on Hadoop cluster using the following command

 hadoop jar  mapreduce-join-example.jar com.bluepi.join.Executor /hdfs_home/data1 /hdfs_home/data2 /hdfs_home/output 

and thats it, your output directory should contain the resultant file with desired results.

We love to hear from you.

Our team would be happy to answer your questions.