Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

My cluster: 1 master, 11 slaves, each node has 6 GB memory.

My settings:

spark.executor.memory=4g, Dspark.akka.frameSize=512


Here is the problem:

First, I read some data (2.19 GB) from HDFS to RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)


Second, do something on this RDD:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })


Last, output to HDFS:

res.saveAsNewAPIHadoopFile(...)


When I run my program it shows:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space


There are too many tasks?

1 Answer

0 votes
by (32.3k points)
edited by

 Few suggestions for you:

  • If your nodes are configured to have 6g maximum for Spark, then use spark.executor.memory=6g. Make sure that according to UI, you're using as much memory as possible(it will tell how much mem you're using). Try to use more partitions i.e. you must have 2 - 4 per CPU. IME increasing the number of partitions is often the right way to make a program more stable and faster. Also, you may need way more than 4 per CPU for huge amounts of data. In my personal experience, in some cases I have even used 8000 partitions.

  • Decrease your fraction of memory reserved for caching, using spark.storage.memoryFraction. If you don't use persist or cache() in your code, this might as well be 0. By default it is 0.6, which means you only get 0.4 * 4g memory for your heap. IME reducing the memory fraction often makes OOMs go away.

After updated Spark 1.6 apparently you don’t have to play with these values anymore, Spark determines them automatically.

  • Similar to above but shuffle memory fraction. Just check whether your job needs much shuffle memory, if it doesn’t, then set it to a lower value. This might cause your shuffles to spill to disk which may have catastrophic impact on speed. In some cases when it's a shuffle operation that's OOMing, you need to do the opposite i.e. set it to something large, like 0.8, or make sure you allow your shuffles to spill to disk (it's the default since 1.0.0).

  • Be aware of memory leaks, these are often caused by accidentally closing over objects you don't need in your lambdas. The way to diagnose is to look out for the "task serialized as XXX bytes" in the logs and if XXX is greater than a few k or more than one MB, you may encounter a memory leak. If you really do need large objects broadcast variables.

  • When caching large RDDs and can sacrifice some access time,consider serializing the RDD.

  • Avoid String and heavily nested structures such as Map and nested case classes. If possible use primitive types and index all non-primitives especially if you expect a lot of duplicates. Choose WrappedArray over nested structures whenever possible. Or even roll out your own serialisation.


 

Reference: http://spark.apache.org/docs/1.2.1/configuration.html

If you want to know more about Spark, then do check out this awesome video tutorial:

Related questions

Browse Categories

...