Partitioner in MapReduce
Intermediate-outputs in the key-value pairs partitioned by a partitioner. The number of reducer tasks is equal to the number of partitions in the job.

Implementation
Let us take some employee details from the intellipaat company as an input table with the name employee.
Emp_id |
name |
age |
gender |
salary |
6001 |
aaaaa |
45 |
Male |
50,000 |
6002 |
bbbbb |
40 |
Female |
50,000 |
6003 |
ccccc |
34 |
Male |
30,000 |
6004 |
ddddd |
30 |
Male |
30,000 |
6005 |
eeeee |
20 |
Male |
40,000 |
6006 |
fffff |
25 |
Female |
35,000 |
6007 |
ggggg |
20 |
Female |
15,000 |
6008 |
hhhhh |
19 |
Female |
15,000 |
6009 |
iiiii |
22 |
Male |
22,000 |
6010 |
jjjjj |
24 |
Male |
25,000 |
6011 |
kkkk |
25 |
Male |
25,000 |
6012 |
hhhh |
28 |
Male |
20,000 |
6013 |
tttt |
18 |
Female |
8,000 |
To find the highest salaried employee by gender in different age group
In “/home/hadoop/hadoopPartitioner” data is saved by the input.txt.
6001 |
aaaaa |
45 |
Male |
50,000 |
6002 |
bbbbb |
40 |
Female |
50,000 |
6003 |
ccccc |
34 |
Male |
30,000 |
6004 |
ddddd |
30 |
Male |
30,000 |
6005 |
eeeee |
20 |
Male |
40,000 |
6006 |
fffff |
25 |
Female |
35,000 |
6007 |
ggggg |
20 |
Female |
15,000 |
6008 |
hhhhh |
19 |
Female |
15,000 |
6009 |
iiiii |
22 |
Male |
22,000 |
6010 |
jjjjj |
24 |
Male |
25,000 |
6011 |
kkkk |
25 |
Male |
25,000 |
6012 |
hhhh |
28 |
Male |
20,000 |
6013 |
tttt |
18 |
Female |
8,000 |
Maptasks:
Maptask takes key-value pairs as an input.
Input: The key pattern should like “special key + filename + line number”
For example: key = #intellipaat.
Method:
- Split function helps to separate the gender.
- Value(record data);
- Send the gender information.
<br>
String[] str = value.toString().split("\t", -3);<br>
String gender=str[3];<br>
context.write(new Text(gender), new Text(value));<br>
Output: To the partition task, the data value is used as output, key – value pair from the map task.
Here is a Mapreduce Tutorial Video by Intellipaat
Partitioner task:
In the partition process data is divided into smaller segments.In this scenario based on the age criteria the key-value pair is divided into three parts.
- Key-value pairs collection
- Key = the value of a gender field in the record
- Value =the gender data value in the record
Method
Read the age field from the key-value pair as an input.
String[] str = value.toString().split("\t");int age = Integer.parseInt(str[2]);<br>
With the following condition check the value of age.
<br>
if(age<=20)<br>
{<br>
return 0;<br>
}<br>
else if(age>20 && age<=30)<br>
{<br>
return 1 % numReduceTasks;<br>
}<br>
else<br>
{<br>
return 2 % numReduceTasks;<br>
}<br>
Output: The output data are segmented into three sets of key-value pairs.
Reduce task:
We have to execute three reduce task here, because the total number of partitioner is equal to the total number of reduce task.
Input: With different sets of key-value pairs reducer will execute three times.
- Key = gender field value
- Value = gender data record
Method: Read each record of salary field value.
<br>
String [] str = val.toString().split("\t", -3);<br>
Note: str[4] have the salary field value.<br>
if(Integer.parseInt(str[4])>max)<br>
{<br>
max=Integer.parseInt(str[4]);<br>
}<br>
Check the salary with a maximum (max) variable, if str[4] is a maximum then assign the str[4] to a maximum, otherwise skip this step.
Execute the step 1 and step 2 repeatedly for each key-value pair.
<br>
context.write(new Text(key), new IntWritable(max));<br>

Output: we will get three collections with different age group.
With respect to each age group,
- Maximum salary from male group
- Maximum salary from female group
In the configuration these below jobs should be specified
- Job
- Input and Output formats of keys and values
- Individual classes for Map, Reduce, and Partitioner tasks
<br>
Configuration conf = getConf();<br>
//Create JobJob job = new Job(conf, "max_sal");<br>
job.setJarByClass(PartitionerExample.class);<br>
// File Input and Output pathsFileInputFormat.setInputPaths(job, new Path(arg[0]));<br>
FileOutputFormat.setOutputPath(job,new Path(arg[1]));<br>
//Set Mapper class and Output format for key-value pair.job.setMapperClass(MapClass.class);<br>
job.setMapOutputKeyClass(Text.class);<br>
job.setMapOutputValueClass(Text.class);<br>
//set partitioner statementjob.setPartitionerClass(CaderPartitioner.class);<br>
//Set Reducer class and Input/Output format for key-value pair.job.setReducerClass(ReduceClass.class);<br>
//Number of Reducer tasks.job.setNumReduceTasks(3);<br>
//Input and Output format for datajob.setInputFormatClass(TextInputFormat.class);<br>
job.setOutputFormatClass(TextOutputFormat.class);<br>
job.setOutputKeyClass(Text.class);<br>
job.setOutputValueClass(Text.class);<br>

Example:
<br>
package Intellipaat_emp;<br>
import java.io.*;<br>
import org.apache.hadoop.io.*;<br>
import org.apache.hadoop.mapreduce.*;<br>
import org.apache.hadoop.conf.*;<br>
import org.apache.hadoop.conf.*;<br>
import org.apache.hadoop.fs.*;<br>
import org.apache.hadoop.mapreduce.lib.input.*;<br>
import org.apache.hadoop.mapreduce.lib.output.*;<br>
import org.apache.hadoop.util.*;<br>
public class Intellipaat_emp extends Configured implements Tool<br>
{<br>
//Map class<br>
public static class MapClass extends Mapper<LongWritable,Text,Text,Text><br>
{<br>
public void map(LongWritable key, Text value, Context context)<br>
{<br>
try{<br>
String[] str = value.toString().split("\t", -3);<br>
String gender=str[3];<br>
context.write(new Text(gender), new Text(value));<br>
}<br>
catch(Exception e)<br>
{<br>
System.out.println(e.getMessage());<br>
}<br>
}<br>
}<br>
//Reducer class<br>
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable><br>
{<br>
public int max = -1;<br>
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException<br>
{<br>
max = -1;<br>
for (Text val : values)<br>
{<br>
String [] str = val.toString().split("\t", -3);<br>
if(Integer.parseInt(str[4])>max)<br>
max=Integer.parseInt(str[4]);<br>
}<br>
context.write(new Text(key), new IntWritable(max));<br>
}<br>
}<br>
//Intellipaat_emp class<br>
public static class CaderPartitioner extends<br>
Partitioner < Text, Text ><br>
{<br>
@Override<br>
public int getPartition(Text key, Text value, int numReduceTasks)<br>
{<br>
String[] str = value.toString().split("\t");<br>
int age = Integer.parseInt(str[2]);<br>
if(numReduceTasks == 0)<br>
{<br>
return 0;<br>
}<br>
if(age<=20)<br>
{<br>
return 0;<br>
}<br>
else if(age>20 && age<=30)<br>
{<br>
return 1 % numReduceTasks;<br>
}<br>
else<br>
{<br>
return 2 % numReduceTasks;<br>
}<br>
}<br>
}<br>
@Override<br>
public int run(String[] arg) throws Exception<br>
{<br>
Configuration conf = getConf();<br>
Job job = new Job(conf, "topsal");<br>
job.setJarByClass(Intellipaat_emp.class);<br>
FileInputFormat.setInputPaths(job, new Path(arg[0]));<br>
FileOutputFormat.setOutputPath(job,new Path(arg[1]));<br>
job.setMapperClass(MapClass.class);<br>
job.setMapOutputKeyClass(Text.class);<br>
job.setMapOutputValueClass(Text.class);<br>
//set partitioner statement<br>
job.setPartitionerClass(CaderPartitioner.class);<br>
job.setReducerClass(ReduceClass.class);<br>
job.setNumReduceTasks(3);<br>
job.setInputFormatClass(TextInputFormat.class);<br>
job.setOutputFormatClass(TextOutputFormat.class);<br>
job.setOutputKeyClass(Text.class);<br>
job.setOutputValueClass(Text.class);<br>
System.exit(job.waitForCompletion(true)? 0 : 1);<br>
return 0;<br>
}<br>
public static void main(String ar[]) throws Exception<br>
{<br>
int res = ToolRunner.run(new Configuration(), new Intellipaat_emp(),ar);<br>
System.exit(0);<br>
}<br>
}<br>
Save the above program by the name Intellipaat_emp.java in “/home/hadoop/hadoopPartitioner”.
Download the jar using the following link http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1
Use the below command to compile the program.
<br>
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java<br>
$ jar -cvf PartitionerExample.jar -C .<br>
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir<br>
Using this command create an input directory in HDFS. Copy the input file and named by input.txt<br>
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Using the below command we can run the top salary apposing by input files.<br>
$HADOOP_HOME/bin/hadoop jar Intellipaat_emp.jar intellipaat_emp.Intellipaat_emp input_dir/input.txt output_dir<br>
Output:<br>
16/07/07 12:12:51 INFO<br>
mapreduce.Job: Job job_1423027269044_0021 completed successfully 16/07/07 12:12:52 INFO mapreduce.Job: Counters: 49<br>
File System Counters<br>
FILE: Number of bytes read=467<br>
FILE: Number of bytes written=426777
FILE: Number of large read operations=0<br>
FILE: Number of write operations=0<br>
HDFS: Number of bytes read=480<br>
HDFS: Number of bytes written=72<br>
HDFS: Number of read operations=12<br>
HDFS: Number of large read operations=0<br>
HDFS: Number of write operations=6<br>
Job Counters<br>
Launched map tasks=1<br>
Launched reduce tasks=3<br>
Data-local map tasks=1<br>
Total time spent by all maps in occupied slots (ms)=8212<br>
Total time spent by all reduces in occupied slots (ms)=59858<br>
Total time spent by all map tasks (ms)=8212<br>
Total time spent by all reduce tasks (ms)=59858<br>
Total vcore-seconds taken by all map tasks=8212<br>
Total vcore-seconds taken by all reduce tasks=59858<br>
Total megabyte-seconds taken by all map tasks=8409088<br>
Total megabyte-seconds taken by all reduce tasks=61294592<br>
Map-Reduce Framework<br>
Map input records=13<br>
Map output records=13<br>
Map output bytes=423<br>
Map output materialized bytes=467<br>
Input split bytes=119<br>
Combine input records=0<br>
Combine output records=0<br>
Reduce input groups=6<br>
Reduce shuffle bytes=467<br>
Reduce input records=13<br>
Reduce output records=6<br>
Spilled Records=26<br>
Shuffled Maps =3<br>
Failed Shuffles=0<br>
Merged Map outputs=3<br>
GC time elapsed (ms)=224<br>
CPU time spent (ms)=3690<br>
Physical memory (bytes) snapshot=553816064<br>
Virtual memory (bytes) snapshot=3441266688<br>
Total committed heap usage (bytes)=334102528<br>
Shuffle Errors<br>
BAD_ID=0<br>
CONNECTION=0<br>
IO_ERROR=0<br>
WRONG_LENGTH=0<br>
WRONG_MAP=0<br>
WRONG_REDUCE=0<br>
File Input Format Counters<br>
Bytes Read=361<br>
File Output Format Counters Bytes Written=72<br>
Hdfs generated the file, using the below command we can see output in<br>
part -00000<br>
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Output in part-00000 Femal<br>
15000 Male<br>
40000 In part-00001 $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Output in part-00001 Female 35000 Male 31000 In part-00002<br>
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Output in part-00002 Female 51000 Male 50000<br>
Our Big Data Courses Duration and Fees
Cohort Starts on: 26th Apr 2025
₹22,743
Cohort Starts on: 3rd May 2025
₹22,743
Cohort Starts on: 26th Apr 2025
₹22,743