Partitioner in MapReduce
Intermediate-outputs in the key-value pairs partitioned by a partitioner. The number of reducer tasks is equal to the number of partitions in the job.
![pic1](data:image/svg+xml,%3Csvg%20xmlns='http://www.w3.org/2000/svg'%20viewBox='0%200%20603%20381'%3E%3C/svg%3E)
Implementation
Let us take some employee details from the intellipaat company as an input table with the name employee.
Emp_id |
name |
age |
gender |
salary |
6001 |
aaaaa |
45 |
Male |
50,000 |
6002 |
bbbbb |
40 |
Female |
50,000 |
6003 |
ccccc |
34 |
Male |
30,000 |
6004 |
ddddd |
30 |
Male |
30,000 |
6005 |
eeeee |
20 |
Male |
40,000 |
6006 |
fffff |
25 |
Female |
35,000 |
6007 |
ggggg |
20 |
Female |
15,000 |
6008 |
hhhhh |
19 |
Female |
15,000 |
6009 |
iiiii |
22 |
Male |
22,000 |
6010 |
jjjjj |
24 |
Male |
25,000 |
6011 |
kkkk |
25 |
Male |
25,000 |
6012 |
hhhh |
28 |
Male |
20,000 |
6013 |
tttt |
18 |
Female |
8,000 |
To find the highest salaried employee by gender in different age group
In “/home/hadoop/hadoopPartitioner” data is saved by the input.txt.
6001 |
aaaaa |
45 |
Male |
50,000 |
6002 |
bbbbb |
40 |
Female |
50,000 |
6003 |
ccccc |
34 |
Male |
30,000 |
6004 |
ddddd |
30 |
Male |
30,000 |
6005 |
eeeee |
20 |
Male |
40,000 |
6006 |
fffff |
25 |
Female |
35,000 |
6007 |
ggggg |
20 |
Female |
15,000 |
6008 |
hhhhh |
19 |
Female |
15,000 |
6009 |
iiiii |
22 |
Male |
22,000 |
6010 |
jjjjj |
24 |
Male |
25,000 |
6011 |
kkkk |
25 |
Male |
25,000 |
6012 |
hhhh |
28 |
Male |
20,000 |
6013 |
tttt |
18 |
Female |
8,000 |
Maptasks:
Maptask takes key-value pairs as an input.
Input: The key pattern should like “special key + filename + line number”
For example: key = #intellipaat.
Method:
- Split function helps to separate the gender.
- Value(record data);
- Send the gender information.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
Output: To the partition task, the data value is used as output, key – value pair from the map task.
Here is a Mapreduce Tutorial Video by Intellipaat
Partitioner task:
In the partition process data is divided into smaller segments.In this scenario based on the age criteria the key-value pair is divided into three parts.
- Key-value pairs collection
- Key = the value of a gender field in the record
- Value =the gender data value in the record
Method
Read the age field from the key-value pair as an input.
String[] str = value.toString().split("\t");int age = Integer.parseInt(str[2]);
With the following condition check the value of age.
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
Output: The output data are segmented into three sets of key-value pairs.
Reduce task:
We have to execute three reduce task here, because the total number of partitioner is equal to the total number of reduce task.
Input: With different sets of key-value pairs reducer will execute three times.
- Key = gender field value
- Value = gender data record
Method: Read each record of salary field value.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
Check the salary with a maximum (max) variable, if str[4] is a maximum then assign the str[4] to a maximum, otherwise skip this step.
Execute the step 1 and step 2 repeatedly for each key-value pair.
context.write(new Text(key), new IntWritable(max));
![Certification in Bigdata Analytics](data:image/svg+xml,%3Csvg%20xmlns='http://www.w3.org/2000/svg'%20viewBox='0%200%20826%20180'%3E%3C/svg%3E)
Output: we will get three collections with different age group.
With respect to each age group,
- Maximum salary from male group
- Maximum salary from female group
In the configuration these below jobs should be specified
- Job
- Input and Output formats of keys and values
- Individual classes for Map, Reduce, and Partitioner tasks
Configuration conf = getConf();
//Create JobJob job = new Job(conf, "max_sal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output pathsFileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statementjob.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.job.setNumReduceTasks(3);
//Input and Output format for datajob.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
![Become a Big Data Architect](data:image/svg+xml,%3Csvg%20xmlns='http://www.w3.org/2000/svg'%20viewBox='0%200%20826%20180'%3E%3C/svg%3E)
Example:
package Intellipaat_emp;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class Intellipaat_emp extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Intellipaat_emp class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(Intellipaat_emp.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new Intellipaat_emp(),ar);
System.exit(0);
}
}
Save the above program by the name Intellipaat_emp.java in “/home/hadoop/hadoopPartitioner”.
Download the jar using the following link http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1
Use the below command to compile the program.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Using this command create an input directory in HDFS. Copy the input file and named by input.txt
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Using the below command we can run the top salary apposing by input files.
$HADOOP_HOME/bin/hadoop jar Intellipaat_emp.jar intellipaat_emp.Intellipaat_emp input_dir/input.txt output_dir
Output:
16/07/07 12:12:51 INFO
mapreduce.Job: Job job_1423027269044_0021 completed successfully 16/07/07 12:12:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters Bytes Written=72
Hdfs generated the file, using the below command we can see output in
part -00000
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in part-00000 Femal
15000 Male
40000 In part-00001 $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in part-00001 Female 35000 Male 31000 In part-00002
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in part-00002 Female 51000 Male 50000
Our Big Data Courses Duration and Fees
Cohort starts on 15th Feb 2025
₹22,743
Cohort starts on 22nd Feb 2025
₹22,743
Cohort starts on 15th Feb 2025
₹22,743