2 views

I have two RDD's that I want to join and they look like this:

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]

It happens to be the case that the key values of rdd1 are unique and also that the tuple-key values of rdd2 are unique. I'd like to join the two data sets so that I get the following rdd:

val rdd_joined:RDD[((T,W), (U,V))]

What's the most efficient way to achieve this? Here are a few ideas I've thought of.

Option 1:

val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})

Option 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)

by (32.3k points)

One way to do to join your RDDs is to create a custom partitioner and then use zipPartitions. I would suggest you to follow the approach given below:

import org.apache.spark.HashPartitioner

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {

override def getPartition(key: Any): Int = key match {

case k: Tuple2[Int, String] => super.getPartition(k._1)

case _ => super.getPartition(key)

}

}

val numSplits = 8

val rdd1 = sc.parallelize(Seq((1, "X"), (2, "Y"), (3, "Z"))).partitionBy(new HashPartitioner(numSplits))

val rdd2 = sc.parallelize(Seq(((1, "M"), 111), ((1, "MM"), 111), ((1, "NN"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))

val result = rdd2.zipPartitions(rdd1)(

(iter2, iter1) => {

val m = iter1.toMap

for {

((t: Int, w), u) <- iter2

if m.contains(t)

} yield ((t, w), (u, m.get(t).get))

}

).partitionBy(new HashPartitioner(numSplits))

result.glom.collect