Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I'm running some operations in PySpark, and recently increased the number of nodes in my configuration (which is on Amazon EMR). However, even though I tripled the number of nodes (from 4 to 12), performance seems not to have changed. As such, I'd like to see if the new nodes are visible to Spark.

I'm calling the following function:

sc.defaultParallelism
>>>> 2


But I think this is telling me the total number of tasks distributed to each node, not the total number of codes that Spark can see.

1 Answer

0 votes
by (32.3k points)

sc.defaultParallelism is just a hint. Basically, depending on the configuration it may not hold a relation to the number of nodes. This is actually the number of partitions if you execute an operation that takes a partition count argument but you don't provide it. For example, sc.parallelize will create a new RDD from a list. You can let it know how many partitions to create in the RDD with the second argument. But the default value for this argument is sc.defaultParallelism.

You can get the number of executors with sc.getExecutorMemoryStatus in the Scala API, but this is not exposed in the Python API.

In general, it is recommended to have around 4 times as many partitions in an RDD as you have executors. This is considered as a good tip because if there is variance in how much time the tasks take this will even it out. Some executors will be processing 5 faster tasks while others will be processing 3 slower tasks, for example.

You don't need to be very accurate with this. Now, if you are having a rough idea, you must go with an estimate. Like if you have an idea that you have less than 200 CPUs, you can consider 500 partitions will be fine.

So now you should try to create RDDs with this number of partitions:

rdd = sc.parallelize(data, 500)     # when distributing local data.

rdd = sc.textFile('file.csv', 500)  # If loading data from a file.

Or if you don't control the creation of the RDD, just repartition it before the computation: 

rdd = rdd.repartition(500)

You can check the number of partitions in an RDD with rdd.getNumPartitions()

However, On pyspark you could still call the scala getExecutorMemoryStatus API using pyspark's py4j bridge:

sc._jsc.sc().getExecutorMemoryStatus().size()

31k questions

32.8k answers

501 comments

693 users

Browse Categories

...