Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I'm trying to understand how Spark's cache work.

Here is my naive understanding, please let me know if I'm missing something:

val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")


In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when rdd2 is saved I assume) and then from cache (assuming there is enough RAM) when rdd3 is saved)

Now here is my question. Let's say I want to cache rdd2 and rdd3 as they will both be used later on, but I don't need rdd1 after creating them.

Basically there is duplication, isn't it? Since once rdd2 and rdd3 are calculated, I don't need rdd1 anymore, I should probably unpersist it, right? the question is when?

Will this work? (Option A)

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()


Does spark add the unpersist call to the DAG? or is it done immediately? if it's done immediately, then basically rdd1 will be non cached when I read from rdd2 and rdd3, right?

Should I do it this way instead (Option B)?

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)

rdd2.cache()
rdd3.cache()

rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

rdd1.unpersist()


So the question is this: Is Option A good enough? i.e. will rdd1 still load the file only once? Or do I need to go with Option B?

1 Answer

0 votes
by (32.3k points)
edited by

It would seem that Option B is required. The reason is related to the executing of persist/cache and unpersist. Since RDD transformations merely build DAG descriptions without execution, in Option A by the time you call unpersist, you still only have job descriptions but not a running execution.

This is relevant because the persist call or cache call just adds the RDD to a Map of RDDs that marked themselves to be persisted during job execution. However, the unpersist directly tells the blockManager to evict the RDD from storage and removes the reference in the Map of persistent RDDs.

persist function

unpersist function

So, you need to call unpersist after Spark is actually executed and the RDD with the block manager is stored.

If you want to know more about Spark, then do check out this awesome video tutorial:

...