Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

Let's assume for the following that only one Spark job is running at every point in time.

What I get so far

Here is what I understand what happens in Spark:

  1. When a SparkContext is created, each worker node starts an executor. Executors are separate processes (JVM), that connects back to the driver program. Each executor has the jar of the driver program. Quitting a driver, shuts down the executors. Each executor can hold some partitions.
  2. When a job is executed, an execution plan is created according to the lineage graph.
  3. The execution job is split into stages, where stages containing as many neighbouring (in the lineage graph) transformations and action, but no shuffles. Thus stages are separated by shuffles.

image 1

I understand that

  • A task is a command sent from the driver to an executor by serializing the Function object.
  • The executor deserializes (with the driver jar) the command (task) and executes it on a partition.

but

Question(s)

How do I split the stage into those tasks?

Specifically:

  1. Are the tasks determined by the transformations and actions or can be multiple transformations/actions be in a task?
  2. Are the tasks determined by the partition (e.g. one task per per stage per partition).
  3. Are the tasks determined by the nodes (e.g. one task per stage per node)?

What I think (only partial answer, even if right)

In https://0x0fff.com/spark-architecture-shuffle, the shuffle is explained with the image

enter image description here

and I get the impression that the rule is

each stage is split into #number-of-partitions tasks, with no regard for the number of nodes

For my first image I'd say that I'd have 3 map tasks and 3 reduce tasks.

For the image from 0x0fff, I'd say there are 8 map tasks and 3 reduce tasks (assuming that there are only three orange and three dark green files).

1 Answer

0 votes
by (32.3k points)
edited by

In Apache Spark, a stage is a physical unit of execution. We can say, it is a step in a physical execution plan. It is a set of parallel tasks — one task per partition. In other words, each job gets divided into smaller sets of tasks, is what you call stages. Generally, it depends on each other and it is very similar to the map and reduce stages in mapreduce.

Basically, a spark job is a computation which is sliced into stages. We can uniquely identify a stage with the help of its id. Whenever it creates a stage, DAGScheduler increments internal counter nextstageId. It helps to track the number of stage submissions.

Talking about your problem:

A separate task does need to be launched for each partition of data for each stage. Consider that each partition will likely reside on distinct physical locations - e.g. blocks in HDFS or directories/volumes for a local file system.

Note that the submission of Stages is driven by the DAG Scheduler. 

In your case I think that there are two related things that confuse you:

1. What determines the content of a task?

2. What determines the number of tasks to be executed?

Spark's engine "glues" together simple operations on consecutive rdds, for example:

rdd1 = sc.textFile( ... ) rdd2 = rdd1.filter( ... ) rdd3 = rdd2.map( ... ) rdd3RowCount = rdd3.count 

so when rdd3 is computed, spark will generate a task per partition of rdd1 and with the implementation of action each task will execute both the filter and the map per line to result in rdd3.

Number of partitions determines the no of tasks. Every RDD comes with a defined number of partitions. For a source RDD that is read from HDFS ( using sc.textFile( ... ) for example ) the number of partitions is actually the number of splits generated by the input format. Some operations on RDD(s) may result in an RDD with a different number of partitions:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Another example is joins:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( independent of partitions number of rdd1 and rdd2 ).

Most of the operations that change the number of partitions involve a shuffle. For example:

rdd2 = rdd1.repartition( 1000 ) 

what actually happens is the task on each partition of rdd1 needs to produce an end-output that can be read by the following stage so to make rdd2 have exactly 1000 partitions ( By performing Hash or Sort ). Tasks on this side are sometimes referred to as "Map ( side ) tasks". A task that will later run on rdd2 will act on one partition of rdd2 and would have to figure out how to combine the map-side outputs relevant to that partition. Similarly as Map (side) tasks, tasks on this side are referred to as "Reduce ( side ) tasks".

The two questions are related: the number of tasks in a stage is actually the number of partitions common to the consecutive rdds those are stick together. Also, the number of partitions of an rdd is freely allowed to change between stages.

The number of concurrent task-slots is numExecutors * ExecutorCores. These can also be occupied by tasks from different, non-dependent stages.

 If you want to know more about Spark, then do check out this awesome video tutorial:

Related questions

Browse Categories

...