Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

Can anyone explain the difference between reducebykey,groupbykey,aggregatebykey and combinebykey? I have read the documents regarding this , but couldn't understand the exact differences.

1 Answer

0 votes
by (32.3k points)
edited by

combineByKey can be used when you are combining elements but your return type differs from your input value type. Spark combineByKey is a transformation operation on PairRDD (i.e. RDD with key/value pair). It is a wider operation as it requires shuffle in the last stage.

// Creating PairRDD studentRDD with key value pairs

val studentRDD = sc.parallelize(Array(

    ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), 

    ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), 

    ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), 

    ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), 

    ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), 

    ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), 

    ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), 

    ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), 

    ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), 

    ("Juan", "Biology", 60)), 3)

//Defining createCombiner, mergeValue and mergeCombiner functions

def createCombiner = (tuple: (String, Int)) => 

    (tuple._2.toDouble, 1)

    

def mergeValue = (accumulator: (Double, Int), element: (String, Int)) => 

    (accumulator._1 + element._2, accumulator._2 + 1)

def mergeCombiner = (accumulator1: (Double, Int), accumulator2: (Double, Int)) => 

    (accumulator1._1 + accumulator2._1, accumulator1._2 + accumulator2._2)

// use combineByKey for finding percentage

val combRDD = studentRDD.map(t => (t._1, (t._2, t._3)))

                        .combineByKey(createCombiner, mergeValue, mergeCombiner)

                        .map(e => (e._1, e._2._1/e._2._2))

//Check the Output

combRDD.collect foreach println

// Output

// (Tina,76.5)

// (Thomas,86.25)

// (Jackeline,76.5)

// (Joseph,82.5)

// (Juan,64.0)

// (Jimmy,77.0)

// (Cory,65.0)

AggregateByKey is same like combineByKey and there is slight difference in functioning and arguments. The aggregateByKey function is used to aggregate the values for each key and adds the potential to return a different value type.

The three parameters of aggregateByKey function,

zeroValue: As we are finding maximum marks out of all subjects we should use Double.MinValue (which is also known as an accumulator)

seqOp: Sequential operation is an operation of finding maximum marks (operation at each partition level data)

combOp: Combiner operation is an operation of finding maximum marks from two values (operation on aggregated data of all partitions)

reduceByKey 

Example: 

scala> val babyNamesCSV = sc.parallelize(List(("David", 6), ("Abby", 4), ("David", 5), ("Abby", 5)))

babyNamesCSV: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> babyNamesCSV.reduceByKey((n,c) => n + c).collect

res0: Array[(String, Int)] = Array((Abby,9), (David,11))

scala> babyNamesCSV.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect

res1: Array[(String, Int)] = Array((Abby,9), (David,11))

reduceByKey: Spark RDD reduceByKey function merges the values for each key using an associative reduce function. Basically reduceByKey function works only for RDDs which contains key and value pairs kind of elements(i.e RDDs having tuple or Map as a data element). It is a transformation operation which means it is lazily evaluated.

groupByKey() is just to group your dataset based on a key. It will result in data shuffling when RDD is not already partitioned. 

 When a groupByKey is called on a RDD pair the data in the partitions are shuffled over the network to form a key and list of values.

val words = Array("one", "two", "two", "three", "three", "three")

val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD

  .reduceByKey(_ + _)

  .collect()

val wordCountsWithGroup = wordPairsRDD

  .groupByKey()

  .map(t => (t._1, t._2.sum))

  .collect()

 The reduceByKey works much better on a large dataset as compared to. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.

reduceBykey

aggregateByKey

groupByKey

Uses Combiner

Uses Combiner

Do not uses Combiner

Take one parameter as function – for seqOp and combOp

Take 2 parameters as functions – one for seqOp and other for combOp

No parameters as functions. Generally followed by map or flatMap

Implicit Combiner

Explicit Combiner

No combiner

Performance is high for aggregations

Performance is high for aggregations

Relatively slow for aggregations

...