Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I have two RDD's that I want to join and they look like this:

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]


It happens to be the case that the key values of rdd1 are unique and also that the tuple-key values of rdd2 are unique. I'd like to join the two data sets so that I get the following rdd:

val rdd_joined:RDD[((T,W), (U,V))]


What's the most efficient way to achieve this? Here are a few ideas I've thought of.

Option 1:

val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})

 

Option 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)

1 Answer

0 votes
by (32.3k points)

One way to do to join your RDDs is to create a custom partitioner and then use zipPartitions. I would suggest you to follow the approach given below:

import org.apache.spark.HashPartitioner

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {

  override def getPartition(key: Any): Int = key match {

    case k: Tuple2[Int, String] => super.getPartition(k._1)

    case _ => super.getPartition(key)

  }

}

val numSplits = 8

val rdd1 = sc.parallelize(Seq((1, "X"), (2, "Y"), (3, "Z"))).partitionBy(new HashPartitioner(numSplits))

val rdd2 = sc.parallelize(Seq(((1, "M"), 111), ((1, "MM"), 111), ((1, "NN"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))

val result = rdd2.zipPartitions(rdd1)(

  (iter2, iter1) => {

    val m = iter1.toMap

    for {

        ((t: Int, w), u) <- iter2

        if m.contains(t)

      } yield ((t, w), (u, m.get(t).get))

  }

).partitionBy(new HashPartitioner(numSplits))

result.glom.collect

...