In Apache Spark, a stage is a physical unit of execution. We can say, it is a step in a physical execution plan. It is a set of parallel tasks — one task per partition. In other words, each job gets divided into smaller sets of tasks, is what you call stages. Generally, it depends on each other and it is very similar to the map and reduce stages in mapreduce.
Basically, a spark job is a computation which is sliced into stages. We can uniquely identify a stage with the help of its id. Whenever it creates a stage, DAGScheduler increments internal counter nextstageId. It helps to track the number of stage submissions.
Talking about your problem:
A separate task does need to be launched for each partition of data for each stage. Consider that each partition will likely reside on distinct physical locations - e.g. blocks in HDFS or directories/volumes for a local file system.
Note that the submission of Stages is driven by the DAG Scheduler.
In your case I think that there are two related things that confuse you:
1. What determines the content of a task?
2. What determines the number of tasks to be executed?
Spark's engine "glues" together simple operations on consecutive rdds, for example:
rdd1 = sc.textFile( ... ) rdd2 = rdd1.filter( ... ) rdd3 = rdd2.map( ... ) rdd3RowCount = rdd3.count
so when rdd3 is computed, spark will generate a task per partition of rdd1 and with the implementation of action each task will execute both the filter and the map per line to result in rdd3.
Number of partitions determines the no of tasks. Every RDD comes with a defined number of partitions. For a source RDD that is read from HDFS ( using sc.textFile( ... ) for example ) the number of partitions is actually the number of splits generated by the input format. Some operations on RDD(s) may result in an RDD with a different number of partitions:
rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).
Another example is joins:
rdd3 = rdd1.join( rdd2 , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( independent of partitions number of rdd1 and rdd2 ).
Most of the operations that change the number of partitions involve a shuffle. For example:
rdd2 = rdd1.repartition( 1000 )
what actually happens is the task on each partition of rdd1 needs to produce an end-output that can be read by the following stage so to make rdd2 have exactly 1000 partitions ( By performing Hash or Sort ). Tasks on this side are sometimes referred to as "Map ( side ) tasks". A task that will later run on rdd2 will act on one partition of rdd2 and would have to figure out how to combine the map-side outputs relevant to that partition. Similarly as Map (side) tasks, tasks on this side are referred to as "Reduce ( side ) tasks".
The two questions are related: the number of tasks in a stage is actually the number of partitions common to the consecutive rdds those are stick together. Also, the number of partitions of an rdd is freely allowed to change between stages.
The number of concurrent task-slots is numExecutors * ExecutorCores. These can also be occupied by tasks from different, non-dependent stages.
If you want to know more about Spark, then do check out this awesome video tutorial: