Implementation Of Mapreduce

First Program in MapReduce

The following table shows the data about customer visited the Intellipaat.com page. The table includes the monthly visitors of intellipaat.com  page and annual average of five years.

JAN FEB MAR APR MAY JUN JULY AUG SEP OCT NOV DEC AVG
2008 23 23 2 43 24 25 26 26 26 25 26 26 25
2009 26 27 28 28 28 30 31 31 31 30 30 30 29
2010 31 32 32 32 33 34 35 36 36 34 34 34 34
2014 39 38 39 39 39 41 42 43 40 39 39 38 40
2016 38 39 39 39 39 41 41 41 00 40 40 39 45

To find the maximum number of visitors and minimum number of visitors in the year we used MapReduce framework.

Here is a Mapreduce Tutorial Video by Intellipaat

Video Thumbnail

Input data: The above data is saved as intellipaat.txt and this is used as an input data.

Example program of MapReduce framework

<em>package hadoop;</em>
<em>import java.util.*;</em>
<em>import java.io.IOException;</em>
<em>import java.io.IOException;</em>
<em>import org.apache.hadoop.fs.Path;</em>
<em>import org.apache.hadoop.conf.*;</em>
<em>import org.apache.hadoop.io.*;</em>
<em>import org.apache.hadoop.mapred.*;</em>
<em>import org.apache.hadoop.util.*;</em>
<em>public class Intellipaat_visitors</em>
<em>{</em>
<em> //Mapper class</em>
<em> public static class E_EMapper extends MapReduceBase implements</em>
<em> Mapper<LongWritable, /*Input key Type */</em>
<em> Text, /*Input value Type*/</em>
<em> Text, /*Output key Type*/</em>
<em> IntWritable> /*Output value Type*/</em>
<em> {</em>
<em> //Map function</em>
<em> public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException</em>
<em> {</em>
<em> String line = value.toString();</em>
<em> String lasttoken = null;</em>
<em> StringTokenizer s = new StringTokenizer(line,"t");</em>
<em> String year = s.nextToken();</em>
<em> while(s.hasMoreTokens()){</em>
<em> lasttoken=s.nextToken();</em>
<em> }</em>
<em> int avgprice = Integer.parseInt(lasttoken);</em>
<em> output.collect(new Text(year), new IntWritable(avgprice));</em>
<em> }</em>
<em> }</em>

Certification in Bigdata Analytics
<em>//Reducer class</em>
<em> public static class E_EReduce extends MapReduceBase implements</em>
<em> Reducer< Text, IntWritable, Text, IntWritable ></em>
<em> {</em>
<em> //Reduce function</em>
<em> public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException</em>
<em> {</em>
<em> int maxavg=30;</em>
<em> int val=Integer.MIN_VALUE;</em>
<em> while (values.hasNext())</em>
<em> {</em>
<em> if((val=values.next().get())>maxavg)</em>
<em> {</em>
<em> output.collect(key, new IntWritable(val));</em>
<em> }</em>
<em> }</em>
<em> }</em>
<em> }</em>
<em> //Main function</em>
<em> public static void main(String args[])throws Exception</em>
<em> {</em>
<em> JobConf conf = new JobConf(Visitors.class);</em>
<em> conf.setJobName("max_visitors");</em>
<em> conf.setOutputKeyClass(Text.class);</em>
<em> conf.setOutputValueClass(IntWritable.class);</em>
<em> conf.setMapperClass(E_EMapper.class);</em>
<em> conf.setCombinerClass(E_EReduce.class);</em>
<em> conf.setReducerClass(E_EReduce.class);</em>
<em> conf.setInputFormat(TextInputFormat.class);</em>
<em> conf.setOutputFormat(TextOutputFormat.class);</em>
<em> FileInputFormat.setInputPaths(conf, new Path(args[0]));</em>
<em> FileOutputFormat.setOutputPath(conf, new Path(args[1]));</em>
<em> JobClient.runJob(conf);</em>
<em> }</em>
<em>}
</em>

Become a Big Data Architect

Save the above program by the name Intellipaat_visitors.java
Store the compiled Java classes in new directory. Use the below command to create a new directory.

<em>$ mkdir visitors</em>

Using the below link to download the jar
<em>http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1</em>

Compile the  Intellipaat_visitors and create  jar for the program.
<em>$ javac -classpath hadoop-core-1.2.1.jar -d visitors Intellipaat_visitors.java</em>
<em>$ jar -cvf visitors.jar -C visitors/ </em>
Create an  input directory in HDFS using below command
<em>$HADOOP_HOME/bin/hadoop fs -mkdir input_dir</em>

Copy the input file named Intellipaat_visitors.txt in the input directory of HDFS.
<em>$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/Intellipaat_visitors.txt input_dir</em>
<em>$HADOOP_HOME/bin/hadoop jar visitors.jar hadoop.Intellipaat_visitors input_dir output_dir</em>
<strong>Output</strong>
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
<em>   </em>
<em>   FILE: Number of bytes read=61</em>
<em>   FILE: Number of bytes written=279400</em>
<em>   FILE: Number of read operations=0</em>
<em>   FILE: Number of large read operations=0</em>
<em>   FILE: Number of write operations=0</em>
<em> </em>
<em>   HDFS: Number of bytes read=546</em>
<em>   HDFS: Number of bytes written=40</em>
<em>   HDFS: Number of read operations=9</em>
<em>   HDFS: Number of large read operations=0</em>
<em>   HDFS: Number of write operations=2 Job Counters</em>
<em>   </em>
<em>   Launched map tasks=2</em>
<em>   Launched reduce tasks=1</em>
<em>   Data-local map tasks=2</em>
<em>         </em>
<em>   Total time spent by all maps in occupied slots (ms)=146137</em>
<em>   Total time spent by all reduces in occupied slots (ms)=441</em>
<em>   Total time spent by all map tasks (ms)=14613</em>
<em>   Total time spent by all reduce tasks (ms)=44120</em>
<em>         </em>
<em>   Total vcore-seconds taken by all map tasks=146137</em>
<em>   Total vcore-seconds taken by all reduce tasks=44120</em>
<em>         </em>
<em>   Total megabyte-seconds taken by all map tasks=149644288</em>
<em>   Total megabyte-seconds taken by all reduce tasks=45178880</em>
<em> </em>

Map-Reduce Framework
<em>   </em>
<em>   Map input records=5</em>
<em>         </em>
<em>   Map output records=5</em>
<em>   Map output bytes=45</em>
<em>   Map output materialized bytes=67</em>
<em>         </em>
<em>   Input split bytes=208</em>
<em>   Combine input records=5</em>
<em>   Combine output records=5</em>
<em>         </em>
<em>   Reduce input groups=5</em>
<em>   Reduce shuffle bytes=6</em>
<em>   Reduce input records=5</em>
<em>   Reduce output records=5</em>
<em>         </em>
<em>   Spilled Records=10</em>
<em>   Shuffled Maps =2</em>
<em>   Failed Shuffles=0</em>
<em>   Merged Map outputs=2</em>
<em>   GC time elapsed (ms)=948</em>
<em>   CPU time spent (ms)=5160</em>
<em> </em><em>  Physical memory (bytes) snapshot=47749120</em>
<em>   Virtual memory (bytes) snapshot=2899349504</em>
<em>   </em><em>Total committed heap usage (bytes)=277684224</em>
<em> </em><em>File Output Format Counters</em>
<em> </em><em>   Bytes Written=40</em>

Using the below command verified the resultant in the output folder
<em>$HADOOP_HOME/bin/hadoop fs -ls output_dir/</em>

The final output of mapreduce framework is

2010 34
2014 40
2016 45

Our Big Data Courses Duration and Fees

Program Name
Start Date
Fees
Cohort starts on 11th Jan 2025
₹22,743
Cohort starts on 18th Jan 2025
₹22,743
Cohort starts on 11th Jan 2025
₹22,743

About the Author

Technical Research Analyst - Big Data Engineering

Abhijit is a Technical Research Analyst specialising in Big Data and Azure Data Engineering. He has 4+ years of experience in the Big data domain and provides consultancy services to several Fortune 500 companies. His expertise includes breaking down highly technical concepts into easy-to-understand content.