bing
Flat 10% & upto 50% off + Free additional Courses. Hurry up!

Mapreduce Partitioner

pic1

 

Intermediate-outputs in the key-value pairs partitioned by a partitioner. The number of reducer tasks is equal to the number of partitions in the job.

 Implementation

 Let us take some employee details from the intellipaat company as an input table  with the name employee.

Emp_id name age gender salary
6001 aaaaa 45 Male 50,000
6002 bbbbb 40 Female 50,000
6003 ccccc 34 Male 30,000
6004 ddddd 30 Male 30,000
6005 eeeee 20 Male 40,000
6006 fffff 25 Female 35,000
6007 ggggg 20 Female 15,000

6008

 

hhhhh 19 Female 15,000
6009 iiiii 22 Male 22,000
6010 jjjjj 24 Male 25,000
6011 kkkk 25 Male 25,000
6012 hhhh 28 Male 20,000
6013 tttt 18 Female 8,000

To find the highest salaried employee  by gender in different age group

In “/home/hadoop/hadoopPartitioner” data is saved by the input.txt.

6001 aaaaa 45 Male 50,000
6002 bbbbb 40 Female 50,000
6003 ccccc 34 Male 30,000
6004 ddddd 30 Male 30,000
6005 eeeee 20 Male 40,000
6006 fffff 25 Female 35,000
6007 ggggg 20 Female 15,000

6008

 

hhhhh 19 Female 15,000
6009 iiiii 22 Male 22,000
6010 jjjjj 24 Male 25,000
6011 kkkk 25 Male 25,000
6012 hhhh 28 Male 20,000
6013 tttt 18 Female 8,000

Maptasks:

Maptask takes key-value pairs  as an input.

 Input: The key pattern should  like “special key + filename + line number”

For example: key = #intellipaat.

Method:

  • Split function helps to separate the gender.
  • Value(record data);
  • Send the gender information.

String[] str = value.toString().split(“\t”, -3);

String gender=str[3];

context.write(new Text(gender), new Text(value));

Output: To the partition task, the data value is used as output, key – value pair from the map task.

Partitioner task: In the partition process data is divided into smaller segments.In this scenario based on the age criteria the key-value pair is divided into three  parts.

  • Key-value pairs collection
  • Key = the value of a gender field in the record
  • Value =the gender data value in the record

 Method

 Read the age field from the key-value pair as an input.

String[] str = value.toString().split(“\t”);int age = Integer.parseInt(str[2]);

With the following condition check the value of age.

  • Age <= 20
  • Age > 20 AND <= 30
  • Age > 30

if(age<=20)

 {  

    return 0;

  }

else if(age>20 && age<=30)

   { 

         return 1 % numReduceTasks;

     }

else

     {  

         return 2 % numReduceTasks;

}

Output: The output  data  are segmented into three sets of key-value pairs.

Reduce task: We have to execute three reduce task here, because  the total number of partitioner is equal to the total number of  reduce task.

Input: With different sets of key-value pairs reducer will execute three times.

  • Key = gender field value
  • Value = gender data record

    Method: Read each record of  salary field value.

String [] str = val.toString().split(“\t”, -3);

Note: str[4] have the salary field value.

if(Integer.parseInt(str[4])>max)

{  

max=Integer.parseInt(str[4]);

}

Check the  salary with a maximum (max) variable, if   str[4] is  a maximum then assign the str[4]  to a maximum, otherwise skip this step.

Execute the step 1 and step 2  repeatedly for each key-value pair.

context.write(new Text(key), new IntWritable(max));

Output:   we will get three collections with different age group.

With respect to each age group,

  1. Maximum salary from male group
  2. Maximum salary from female group

In the configuration these below  jobs should be specified

  • Job
  • Input and Output formats of keys and values
  • Individual classes for Map, Reduce, and Partitioner tasks

Configuration conf = getConf(); 

//Create JobJob job = new Job(conf, “max_sal”);

job.setJarByClass(PartitionerExample.class); 

// File Input and Output pathsFileInputFormat.setInputPaths(job, new Path(arg[0]));

FileOutputFormat.setOutputPath(job,new Path(arg[1]));

 //Set Mapper class and Output format for key-value pair.job.setMapperClass(MapClass.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class); 

//set partitioner statementjob.setPartitionerClass(CaderPartitioner.class);

 //Set Reducer class and Input/Output format for key-value pair.job.setReducerClass(ReduceClass.class); 

//Number of Reducer tasks.job.setNumReduceTasks(3); 

//Input and Output format for datajob.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

Example:

package Intellipaat_emp;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class Intellipaat_emp extends Configured implements Tool
{
//Map class

public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split(“\t”, -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}

//Reducer class

public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;

for (Text val : values)
{
String [] str = val.toString().split(“\t”, -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}

context.write(new Text(key), new IntWritable(max));
}
}

//Intellipaat_emp class

public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split(“\t”);
int age = Integer.parseInt(str[2]);

if(numReduceTasks == 0)
{
return 0;
}

if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}

@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();

Job job = new Job(conf, “topsal”);
job.setJarByClass(Intellipaat_emp.class);

FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

job.setMapperClass(MapClass.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement

job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}

public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new Intellipaat_emp(),ar);
System.exit(0);
}
}

Save the above program by the name  Intellipaat_emp.java in “/home/hadoop/hadoopPartitioner”.

Download the jar using the following link http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1

Use the below command to compile the program.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Using  this command create an input directory in HDFS.

Copy the input file and named by input.txt

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Using the below command we can run the top salary apposing by input files.

$HADOOP_HOME/bin/hadoop jar Intellipaat_emp.jar intellipaat_emp.Intellipaat_emp input_dir/input.txt output_dir
 Output:

16/07/07 12:12:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
16/07/07 12:12:52 INFO mapreduce.Job: Counters: 49

File System Counters
 
   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
              
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
              
Job Counters
 
   Launched map tasks=1
   Launched reduce tasks=3
              
   Data-local map tasks=1
              
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
              
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
              
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
              
Map-Reduce Framework
 
   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
              
   Input split bytes=119
              
   Combine input records=0
   Combine output records=0
              
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
              
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
              
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
              
   Total committed heap usage (bytes)=334102528
              
Shuffle Errors
 
   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
              
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
              
File Input Format Counters
 
   Bytes Read=361
              
File Output Format Counters
  Bytes Written=72
 
Hdfs generated the file, using the below command we can see output in

part -00000

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in part-00000

Female   15000
Male     40000
In part-00001

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in part-00001

Female   35000
Male    31000
In part-00002

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in part-00002

Female  51000
Male   50000

This blog will help you get a better understanding of Hadoop MapReduce – What it Refers To?

"0 Responses on Mapreduce Partitioner"

Leave a Message

Your email address will not be published.

Training in Cities

Bangalore, Hyderabad, Chennai, Delhi, Kolkata, UK, London, Chicago, San Francisco, Dallas, Washington, New York, Orlando, Boston

100% Secure Payments. All major credit & debit cards accepted Or Pay by Paypal.

top

Sales Offer

  • To avail this offer, enroll before 06th December 2016.
  • This offer cannot be combined with any other offer.
  • This offer is valid on selected courses only.
  • Please use coupon codes mentioned below to avail the offer
offer-june

Sign Up or Login to view the Free Mapreduce Partitioner.