import org.apache.spark.HashPartitioner
class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {
override def getPartition(key: Any): Int = key match {
case k: Tuple2[Int, String] => super.getPartition(k._1)
case _ => super.getPartition(key)
}
}
val numSplits = 8
val rdd1 = sc.parallelize(Seq((1, "X"), (2, "Y"), (3, "Z"))).partitionBy(new HashPartitioner(numSplits))
val rdd2 = sc.parallelize(Seq(((1, "M"), 111), ((1, "MM"), 111), ((1, "NN"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))
val result = rdd2.zipPartitions(rdd1)(
(iter2, iter1) => {
val m = iter1.toMap
for {
((t: Int, w), u) <- iter2
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}
).partitionBy(new HashPartitioner(numSplits))
result.glom.collect