Friday, January 4, 2013

Hadoop for dependent data splits / Using Distributed Cache in Hadoop Map Reduce


Hadoop is known to process independent data slices, but what about dependent data. Suppose for the processing of employee’s salary we need data on their location as well as their grades but the location and grade details are available with another file . So while processing each line of the employee file we need to look up information from location and grade files. How do we achieve this? Go for a distributed cache approach.

Distributed Cache
                Hadoop has the concept of a distributed cache which all task trackers (nodes) have access to. When we want to distribute some common data across all task trackers we go for distributed cache. When we need to distribute a file , multiple copies of the file would be maintained for all task trackers to access. When we need to look up for some references, the reference data/file would be initially posted in the distributed cache. The main point to be noted here is that the files chosen to be distributed should be very small.The maximum size of a file to be distributed in a medium range cluster shouldn’t be more than 100MB.(this value could vary from cluster to cluster)

Solution to our Problem
                When we consider our scenario say we have 1 million employees across 25 locations spanning across 71 grades. On a very crude analysis out here, we can see that the location and grade data is relatively too small compared to Employee data. So here our approach could be like, processing the employee data that is in HDFS with the other two reference data.
                For mock calculation purposes I’m implementing the addition of a location bonus and monthly bonus based on grade to all employees in addition to the basic salary calculation defined in the previous example.
                So other than the employees.txt file, we do have two more input files.
1.       Location.txt which has details like location id, location name and annual  bonus
2.       Grade.txt which has details like Grade id , Grade and annual Bonus

Employee.txt
10007~James Wilson~L105~G06~110000~22~8
10100~Roger Williams~L103~G09~145000~20~8

Location.txt
L105,Bangalore,200000
L03,Hyderabad,160000

Grade.txt
G06,D3,450000
G09,F3,500000

Mapper Class - SalaryCalculatorMapper.java

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class SalaryCalculatorMapper extends MapReduceBaseimplements Mapper<LongWritable, Text, Text, NullWritable>
{
             //Variables for Map Reduce
             private Text word = new Text();
          
           //Variables for business calculations
           private StringemployeeId,employeeName,locationId,gradeId;
           private DoublemonthlyPay,daysWorked,monthlyHolidays,currentMonthPay,locationAllowance,gradeBonus;
          
           //data structures for storing location and grade details
           private static Map<String, String> locationMap = newHashMap<String, String>();
           private static Map<String, String> gradeMap = newHashMap<String, String>();
          
           //variables for processing reference files
           private StringlocId,locName,locAllowance,grId,grade,grBonus;
          
           public void configure(JobConf job)
           {
                  try {
                        processFile(new File("location.txt"));
                        processFile(new File("grade.txt"));
                  } catch (FileNotFoundException e) {
                        e.printStackTrace();
                  }
            }
           public void processFile(File fFile) throwsFileNotFoundException {

                  Scanner scanner = new Scanner(fFile);
                  try {
                        // first use a Scanner to get each line
                        while (scanner.hasNextLine()) {
                             if(fFile.getName().equals("location.txt"))
                                   processLineLocation(scanner.nextLine());
                              elseif(fFile.getName().equals("grade.txt"))
                                   processLineGrade(scanner.nextLine());
                        }
                  } finally {
                        // ensure the underlying stream is always closed
                        scanner.close();
                  }
            }

            public void processLineLocation(String aLine) {
                  // use a second Scanner to parse the content of each line

                  try {
                        Scanner scanner = new Scanner(aLine);
                        scanner.useDelimiter(",");
                        if (scanner.hasNext()) {
                              locId = scanner.next();
                              locName = scanner.next();
                              locAllowance = scanner.next();
                              //we dont need location name for our computations hence not including the same in map
                             locationMap.put(locId.trim(),locAllowance.trim());
                        }
                  } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                  }
            }
           
            public void processLineGrade(String aLine) {
                  // use a second Scanner to parse the content of each line
                  try {
                        Scanner scanner = new Scanner(aLine);
                        scanner.useDelimiter(",");
                        if (scanner.hasNext()) {
                              grId = scanner.next();
                              grade = scanner.next();
                              grBonus = scanner.next();
                              //we dont need grade for our computations hence not including the same in map
                             gradeMap.put(grId.trim(),grBonus.trim());
                        }
                  } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                  }
            }

          
           public void map(LongWritable key, Text value, OutputCollector<Text, NullWritable> output, Reporter reporter)throws IOException 
           {
            /* extracting each line and sending it for processing to process method  and
             retrieving the result back for sending to output collector*/
             String outputText=processRecord(value.toString());
               
             //casting the String to Text
                   word.set(outputText);
                   
                   //Sending key value to output collector
                   output.collect(word, NullWritable.get());
           }
          
           public String processRecord(String record)
           {
             StringBuilder outputRecord=new StringBuilder("");
             
             try {
                        Scanner scanner = new Scanner(record);
                        //setting the delimiter used in input file
                        scanner.useDelimiter("~");
                        if ( scanner.hasNext() )
                        {
                          employeeId = scanner.next();
                          employeeName=scanner.next();
                          locationId=scanner.next();
                          gradeId=scanner.next();
                         monthlyPay=Double.parseDouble(scanner.next());
                         daysWorked=Double.parseDouble(scanner.next());
                         monthlyHolidays=Double.parseDouble(scanner.next());
                          
                          //Initializing the calculated salary as 0
                          currentMonthPay=0.0;
                          
                          //Computing location allowance and grade bonus
                          locationAllowance = Double.parseDouble(locationMap.get(locationId.trim()));
                          gradeBonus = Double.parseDouble(gradeMap.get(gradeId.trim()));
                          
                          //monthly salary computations
                          if((daysWorked+monthlyHolidays)!=30)
                                currentMonthPay=(monthlyPay/30)*(daysWorked+monthlyHolidays);
                          else
                                currentMonthPay=monthlyPay;
                          
                          //total salary for a month
                          currentMonthPay = currentMonthPay+(locationAllowance/12) +(gradeBonus/12);
                          
                          //rounding using Big Decimal
                          BigDecimal bd = newBigDecimal(currentMonthPay);
                          bd = bd.setScale(2,BigDecimal.ROUND_UP);//2 -  decimal places
                          currentMonthPay = bd.doubleValue();
                          
                        //get output in the format Employee Id~Employee Name~Monthly Pay~Days Worked~Calculated Salary
                        outputRecord.append(employeeId);
                        outputRecord.append("~");
                        outputRecord.append(employeeName);
                        outputRecord.append("~");
                       outputRecord.append(monthlyPay.toString());
                        outputRecord.append("~");
                       outputRecord.append(daysWorked.toString());
                        outputRecord.append("~");
                       outputRecord.append(currentMonthPay.toString());
                       
                         
                        } 
                       
              }
               catch (Exception e) {
                              e.printStackTrace();
                        }
               
               return outputRecord.toString();
           }
}

Driver Class – SalaryCalculator.java

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



public class SalaryCalculator extends Configured implements Tool {

      public int run(String[] args) throws Exception {

            JobConf conf = new JobConf(getConf(), SalaryCalculator.class);
            conf.setJobName("CostCenterSummary");

            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(NullWritable.class);

            conf.setMapperClass(SalaryCalculatorMapper.class);
           

            FileInputFormat.addInputPath(confnewPath(args[0]));
            FileOutputFormat.setOutputPath(confnewPath(args[1]));

            JobClient.runJob(conf);
            return 0;
      }

      public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), newSalaryCalculator(),
                        args);
            System.exit(res);
      }
}

Reducer Class
No reducer class is required if you don’t need one, during run time the default reducer class would be substituted in map reduce execution. But the point to be noted here is that when you don’t specify a reducer class the default reducer class instantiated would have the input and output key value types same as that of the mapper’s output key value types. If you need a different key value type as reducer output then you need to define your custom reducer.

How to run the map reduce job?
                                Follow the steps in order to run the job
1.       Pack these 2  files into a jar (salary calc.jar) and copy the same into some location in LFS(/home/bejoys/samples/)
2.       Copy the input file into some location in LFS (/home/bejoys/samples/input.txt)
3.       Copy the input into HDFS
Hadoop fs – copyFromLocal /home/bejoys/samples/employee.txt /userdata/bejoys/samples/salcalc/input/
4.       Run the jar
5.       hadoop jar /home/bejoys/samples/salarycalc.jarcom.bejoy.hadoop.samples.salcal.SalaryCalculator –files/home/bejoys/samples/location.txt, /home/bejoys/samples/grade.txt  –D mapred.reduce.tasks=0 /userdata/bejoys/samples/salcalc/input//userdata/bejoys/samples/salcalc/output/
6.       Retrieve the output on to LFS
hadoop fs -getmerge /userdata/bejoys/samples/salcalc/output/home/bejoys/samples/salcal_output.txt
text – location in LFS
text – location in HDFS

Points to be noted here
1.       NullWritable
If you look at the example, it is merely for parallel processing and you don’t need a key value concept out here but map reduce programming supports only key value pair programming. Here we treat the whole record as key and since we have no value we go in for NullWritable.
                Unlike other Writable classes we don’t create a new instance of the same (new NullWritable()) rather we just get an instance of the same(NullWritable.get()). This is because unlike other Writables in Hadoop NullWritable is Singleton.

2.       –files
This option is used during run time to distribute files in cache. Only small files should be placed on distributed cache. When you specify multiple files to be loaded on to distributed cache they have to be specified separated by comma. Make sure that there are no spaces between file names and comma. These files would be retrieved by the task trackers into their local file system before the execution of tasks.

No comments:

Post a Comment