Explore Courses Blog Tutorials Interview Questions
0 votes
1 view
in Big Data Hadoop & Spark by (11.4k points)

I currently automate my Apache Spark Pyspark scripts using clusters of EC2s using Sparks preconfigured ./ec2 directory. For automation and scheduling purposes, I would like to use Boto EMR module to send scripts up to the cluster.

I was able to bootstrap and install Spark on a cluster of EMRs. I am also able to launch a script on EMR by using my local machine's version of pyspark, and setting master like such:

$: MASTER=spark://<insert EMR master node of cluster here> ./bin/pyspark <>

However, this requires me to run that script locally, and thus I am not able to fully leverage Boto's ability to 1) start the cluster 2) add the script steps and 3) stop the cluster. I've found examples using and emr "step" commands for spark-shell (scala), but I assume there is an easier way to do this with the Python module (pyspark).

1 Answer

0 votes
by (32.3k points)

Here is a great example of how it needs to be configured. Browse to "A quick example" for Python code.

However, in order to make things working in emr-4.7.2, a few tweaks had to be made, so here is a AWS CLI command that worked for me:

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

And here is a contents of file:

from __future__ import print_function

from pyspark import SparkContext

import sys

if __name__ == "__main__":

    if len(sys.argv) != 3:

        print("Usage: testjob  ", file=sys.stderr)


    sc = SparkContext(appName="MyTestJob")

    dataTextAll = sc.textFile(sys.argv[1])

    dataRDD = x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)



It reads the data.csv file from S3, splits every row, converts first value to string and a second to float, groups by first value and sums the values in the second column, and writes the result back to S3.

Few points to be noted:

  • I've decided to leave spark.yarn.submit.waitAppCompletion=true so that I can monitor job execution in console.

  • Input and output paths (sys.argv[1] and sys.argv[2] respectively) are moved to the script as part of the job submission (Args section in add-steps command).

  • Be aware that you must use s3a:// URI instead of s3n:// and s3:// for Hadoop 2.7+ when configuring your job.

  • Also, If your cluster resides in VPC, you need to create a VPC Endpoint for Amazon S3 if you intend to read/write from there in your EMR jobs.

Welcome to Intellipaat Community. Get your technical queries answered by top developers!

28.4k questions

29.7k answers


94k users

Browse Categories