0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

After reading some document, I got some question that I want to clarify.

Take this example from Spark:

JavaSparkContext spark = new JavaSparkContext(
  new SparkConf().setJars("...").setSparkHome....);
JavaRDD<String> file = spark.textFile("hdfs://...");

// step1
JavaRDD<String> words =
  file.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) {
      return Arrays.asList(s.split(" "));
    }
  });

// step2
JavaPairRDD<String, Integer> pairs =
  words.map(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
    }
  });

// step3

JavaPairRDD<String, Integer> counts =
  pairs.reduceByKey(new Function2<Integer, Integer>() {
    public Integer call(Integer a, Integer b) {
      return a + b;
    }
  });

counts.saveAsTextFile("hdfs://...");


So let's say I have 3 nodes cluster, and node 1 running as master, and the above driver program has been properly jared (say application-test.jar). So now I'm running this code on the master node and I believe right after the SparkContext being created, the application-test.jar file will be copied to the worker nodes (and each worker will create a dir for that application).

So now my question: Are step1, step2 and step3 in the example tasks that get sent over to the workers? If yes, then how does the worker execute that? Like java -cp "application-test.jar" step1 and so on?

1 Answer

0 votes
by (31.4k points)

A task is a command sent from the driver to an executor by serializing your Function object.

 The executor deserializes the command (this is possible because it has loaded your jar), and executes it on a partition.

To get a clear insight on how tasks are created and scheduled, we must understand how execution model works in Spark. Shortly speaking, an application in spark is executed in three steps :

  1. Create RDD graph

  2. Create execution plan according to the RDD graph. Stages are created in this step

  3. Generate tasks based on the plan and get them scheduled across workers

In your word-count example, the RDD graph is rather simple, it's something as follows :

file -> lines -> words -> per-word count -> global word count -> output

Based on this graph, two stages are created. The stage creation rule is based on the idea to pipeline as many narrow transformations as possible. In your example, the narrow transformation finishes at per-word count. Therefore, you get two stages

file -> lines -> words -> per-word count

global word count -> output

Once stages are figured out, spark will generate tasks from stages. The first stage will create ShuffleMapTasks and the last stage will create ResultTasks because in the last stage, one action operation is included to produce results.

The number of tasks to be generated depends on how your files are distributed. Suppose that you have 3 three different files in three different nodes, the first stage will generate 3 tasks : one task per partition.

Therefore, you should not map your steps to tasks directly. A task belongs to a stage, and is related to a partition.

Usually, the number of tasks ran for a stage is exactly the number of partitions of the final RDD, but since RDDs can be shared (and hence ShuffleMapStages) their number varies depending on the RDD/stage sharing.

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...