This error is almost guaranteed to be caused because of memory issues on your executors.
Here are some ways to address these types of problems:
1) Try to run with more partitions (do a repartition on your dataframe). Memory issues typically arise when one or more partitions contain more data than the capacity.
2) Here, you have not explicitly set spark.yarn.executor.memoryOverhead, so it will default to max(386, 0.10* executorMemory) which in your case will be 400MB, which is very low according to me. I would try to increase it to say 1GB (note that if you increase memoryOverhead to 1GB, you need to lower --executor-memory to 3GB)
Also, org.apache.spark.shuffle.FetchFailedException can occur due to timeout retrieving shuffle partitions. To fix this problem, you can set the following:
SET spark.reducer.maxReqsInFlight=1; -- Only pull one file at a time to use full network bandwidth.
SET spark.shuffle.io.retryWait=60s; -- Increase the time to wait while retrieving shuffle partitions before retrying. Longer times are necessary for larger files.
If you want to know more about Spark, then do check out this awesome video tutorial: