Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I'd like to select a range of elements in a Spark RDD. For example, I have an RDD with a hundred elements, and I need to select elements from 60 to 80. How do I do that?

I see that RDD has a take(i: int) method, which returns the first i elements. But there is no corresponding method to take the last i elements, or i elements from the middle starting at a certain index.

1 Answer

0 votes
by (32.3k points)

(SPARK-911)Just go through the code given below, it will run efficiently if the RDD is sorted and you may query it multiple times. 

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()

val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];

val (lower, upper) = (10, 20)

val range = p.getPartition(lower) to p.getPartition(upper)

println(range)

val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {

  if (range.contains(i))

    for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)

  else

    Iterator.empty

}

for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")

...