Here is a great example of how it needs to be configured. Browse to "A quick example" for Python code.
However, in order to make things working in emr-4.7.2, a few tweaks had to be made, so here is a AWS CLI command that worked for me:
aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE
And here is a contents of pythonjob.py file:
from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: testjob ", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="MyTestJob")
dataTextAll = sc.textFile(sys.argv[1])
dataRDD = dataTextAll.map(lambda x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)
dataRDD.saveAsTextFile(sys.argv[2])
sc.stop()
It reads the data.csv file from S3, splits every row, converts first value to string and a second to float, groups by first value and sums the values in the second column, and writes the result back to S3.
Few points to be noted:
I've decided to leave spark.yarn.submit.waitAppCompletion=true so that I can monitor job execution in console.
Input and output paths (sys.argv[1] and sys.argv[2] respectively) are moved to the script as part of the job submission (Args section in add-steps command).
Be aware that you must use s3a:// URI instead of s3n:// and s3:// for Hadoop 2.7+ when configuring your job.
Also, If your cluster resides in VPC, you need to create a VPC Endpoint for Amazon S3 if you intend to read/write from there in your EMR jobs.