2 views

I have a list of Tuples of type : (user id, name, count).

For example,

val x = sc.parallelize(List(
("a", "b", 1),
("a", "b", 1),
("c", "b", 1),
("a", "d", 1))
)

I'm attempting to reduce this collection to a type where each element name is counted.

So in above val x is converted to :

(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))

Here is the code I am currently using :

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

grouped2.foreach(println)

I'm attempting to use reduceByKey as it performs faster than groupByKey.

How can reduceByKey be implemented instead of above code to provide the same mapping ?

by (32.3k points)
edited by

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

One way to use reduceByKey in this case can be done as:

val reducedByKey = byKey.reduceByKey(_ + _)

reducedByKey.collect.foreach(println)

((a,d),1)

((a,b),2)

((c,b),1)

PairRDDFunctions[K,V].reduceByKey will take an associative reduce function that can be applied to the to type V of the RDD[(K,V)]. In other words, you need a function f[V](e1:V, e2:V) : V . In this particular case with sum on Ints: (x:Int, y:Int) => x+y or  _ + _  in short underscore notation.

Enroll in the Scala course in Toronto to get professionally certified.

For the record: reduceByKey performs better than groupByKey because it attemps to apply the reduce function locally before the shuffle/reduce phase. Whereas groupByKey forces a shuffle of all elements before grouping.

by (33.1k points)

The origin data structure is RDD[(String, String, Int)], and reduceByKey can only be used if data structure is RDD[(K, V)].

For example:

val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]

val reduced = kv.reduceByKey(_ + _)       // reduced is RDD[((String, String), Int)]

val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]

val grouped = kv2.groupByKey()            // grouped is RDD[(String, Iterable[(String, Int)])]

grouped.foreach(println)