Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I'm writing to see if anyone knows how to speed up S3 write times from Spark running in EMR?

My Spark Job takes over 4 hours to complete, however the cluster is only under load during the first 1.5 hours.

enter image description here

I was curious into what Spark was doing all this time. I looked at the logs and I found many s3 mvcommands, one for each file. Then taking a look directly at S3 I see all my files are in a _temporarydirectory.

Secondary, I'm concerned with my cluster cost, it appears I need to buy 2 hours of compute for this specific task. However, I end up buying unto 5 hours. I'm curious if EMR AutoScaling can help with cost in this situation.

Some articles discuss changing the file output committer algorithm but I've had little success with that.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

Writing to the local HDFS is quick. I'm curious if issuing a hadoop command to copy the data to S3 would be faster?

enter image description here

1 Answer

0 votes
by (32.3k points)

I think what you are encountering is a problem with outputcommitter and s3. Here, the commit job applies fs.rename on the _temporary folder and since rename is not supported by S3, this means that a single request is now copying and deleting all the files from _temporary to its final destination.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") only works with hadoop version > 2.7. This copies each file from _temporary on commit task and not commit job so it is distributed and works pretty fast.

If you use older version of hadoop, I would suggest you to use Spark 1.6 with it and use:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

You need to note two important things here:

  • It does not work with speculation turned on or writing in append mode

  • In Spark 2.0 it is deprecated(replaced by algorithm.version=2)

By the way I personally write with Spark to HDFS and use DISTCP jobs (specifically s3-dist-cp) in production to copy the files to S3 but this is done for several other reasons (consistency, fault tolerance) so it is not necessary. Just try to implement what I suggested and you will be able to write to S3 pretty fast.

31k questions

32.8k answers

501 comments

693 users

Browse Categories

...