0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

When a resilient distributed dataset (RDD) is created from a text file or collection (or from another RDD), do we need to call "cache" or "persist" explicitly to store the RDD data into memory? Or is the RDD data stored in a distributed way in the memory by default?

val textFile = sc.textFile("/user/emp.txt")


As per my understanding, after the above step, textFile is a RDD and is available in all/some of the node's memory.

If so, why do we need to call "cache" or "persist" on textFile RDD then?

1 Answer

0 votes
by (31.4k points)
edited by

Spark processes are lazy, that is, nothing will happen until it's required. To quickly answer the question, after val textFile = sc.textFile("/user/emp.txt") is issued, nothing happens to the data and only a HadoopRDD is constructed using the file as source.

Let's say we transform that data a bit:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Again, nothing happens to the data. Now there's a new RDD i.e. wordsRDD that contains a reference to testFile and a function to be applied when needed.

Only when an action is called upon an RDD, like wordsRDD.count, the RDD chain, called lineage will be executed. That is, the data, broken down in the partitions will be loaded by the Spark cluster's executors, the flatMap function will be applied and the result will be calculated.

On a linear lineage, like the one in this example, cache() is not needed. The data will be loaded to the executors, all the transformations will be applied and finally the count will be computed, all in memory - if the data fits in memory.

cache is useful when the lineage of the RDD branches out. Let's say you want to filter the words of the previous example into a count for positive and negative words. 

You could do this like using the way mentioned below:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()

val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Here, each branch issues a reload of the data. Adding an explicit cache statement will ensure that processing done previously is preserved and reused. The job will look like this:

val textFile = sc.textFile("/user/emp.txt")

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

wordsRDD.cache()

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()

val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

For that reason, the cache is said to 'break the lineage' as it creates a checkpoint that can be reused for further processing.

Rule of thumb: Use cache when the lineage of your RDD branches out or when an RDD is used multiple times like in a loop.

If you want to know more about spark, you can refer to the following video:

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...