Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

When I run the parsing code with 1 GB dataset it completes without any error. But, when I attempt 25 gb of data at a time I get below errors. I'm trying to understand how can I avoid below failures. Happy to hear any suggestions or ideas.

Differnt errors,

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx

org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094}
 

Cluster Details:

Yarn: 8 Nodes
Total cores: 64
Memory: 500 GB
Spark Version: 1.5

Spark submit statement:

spark-submit --master yarn-cluster \
                        --conf spark.dynamicAllocation.enabled=true \
                        --conf spark.shuffle.service.enabled=true \
                        --executor-memory 4g \
                        --driver-memory 16g \
                        --num-executors 50 \
                        --deploy-mode cluster \
                        --executor-cores 1 \
                        --class my.parser \
                        myparser.jar \
                        -input xxx \
                        -output xxxx \


One of stack trace:

at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

2 Answers

0 votes
by (32.3k points)
edited by

This error is almost guaranteed to be caused because of memory issues on your executors.

Here are some ways to address these types of problems:

1) Try to run with more partitions (do a repartition on your dataframe). Memory issues typically arise when one or more partitions contain more data than the capacity.

2) Here, you have not explicitly set spark.yarn.executor.memoryOverhead, so it will default to max(386, 0.10* executorMemory) which in your case will be 400MB, which is very low according to me. I would try to increase it to say 1GB (note that if you increase memoryOverhead to 1GB, you need to lower --executor-memory to 3GB)

Also, org.apache.spark.shuffle.FetchFailedException can occur due to timeout retrieving shuffle partitions. To fix this problem, you can set the following:

  • SET spark.reducer.maxReqsInFlight=1;  -- Only pull one file at a time to use full network bandwidth.

  • SET spark.shuffle.io.retryWait=60s;  -- Increase the time to wait while retrieving shuffle partitions before retrying. Longer times are necessary for larger files.

  • SET spark.shuffle.io.maxRetries=10;

If you want to know more about Spark, then do check out this awesome video tutorial:

 

0 votes
by (2.6k points)
I think this error is probably arising out of memory issues for your executors. However, I can propose some solutions to tackle these kinds of problems.

1) You may also increase the number of partitions (performing a repartition of your dataframe). More often than not memory errors come about because one or more partitions contain more data than a system can hold in memory.

2) Also, I have observed there is no specific mention of spark.yarn.executor.memoryOverhead, which means it will take the value of max(386, 0.10 * executorMemory), which for you is about 400MB. To me, that is quite small. Therefore, I would suggest it goes up to something like 1GB (but note that if memoryOverhead is increased to 1GB, then --executor-memory should be downgraded to 3GB).

3) Also, look for log files in the failing node(s). You will be searching for the line “Killing container.” Kaggle has helped me in the past with this message “running beyond physical memory limits” where increasing memoryOverhead tends to fix the issues.

1.4k questions

32.9k answers

507 comments

693 users

...