Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I currently automate my Apache Spark Pyspark scripts using clusters of EC2s using Sparks preconfigured ./ec2 directory. For automation and scheduling purposes, I would like to use Boto EMR module to send scripts up to the cluster.

I was able to bootstrap and install Spark on a cluster of EMRs. I am also able to launch a script on EMR by using my local machine's version of pyspark, and setting master like such:

$: MASTER=spark://<insert EMR master node of cluster here> ./bin/pyspark <myscriptname.py>


However, this requires me to run that script locally, and thus I am not able to fully leverage Boto's ability to 1) start the cluster 2) add the script steps and 3) stop the cluster. I've found examples using script-runner.sh and emr "step" commands for spark-shell (scala), but I assume there is an easier way to do this with the Python module (pyspark).

1 Answer

0 votes
by (32.3k points)

Here is a great example of how it needs to be configured. Browse to "A quick example" for Python code.

However, in order to make things working in emr-4.7.2, a few tweaks had to be made, so here is a AWS CLI command that worked for me:

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

And here is a contents of pythonjob.py file:

from __future__ import print_function

from pyspark import SparkContext

import sys

if __name__ == "__main__":

    if len(sys.argv) != 3:

        print("Usage: testjob  ", file=sys.stderr)

        exit(-1)

    sc = SparkContext(appName="MyTestJob")

    dataTextAll = sc.textFile(sys.argv[1])

    dataRDD = dataTextAll.map(lambda x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)

    dataRDD.saveAsTextFile(sys.argv[2])

    sc.stop()

It reads the data.csv file from S3, splits every row, converts first value to string and a second to float, groups by first value and sums the values in the second column, and writes the result back to S3.

Few points to be noted:

  • I've decided to leave spark.yarn.submit.waitAppCompletion=true so that I can monitor job execution in console.

  • Input and output paths (sys.argv[1] and sys.argv[2] respectively) are moved to the script as part of the job submission (Args section in add-steps command).

  • Be aware that you must use s3a:// URI instead of s3n:// and s3:// for Hadoop 2.7+ when configuring your job.

  • Also, If your cluster resides in VPC, you need to create a VPC Endpoint for Amazon S3 if you intend to read/write from there in your EMR jobs.

Browse Categories

...