[Arch] Spark job submission breakdown

I did write the Spark deploy mode in this post too, but I realized that it would be too long, so I decided to split it into two posts. I suggest you to go to this to have a good and general view of Spark in the ecosystem. In this post, I will focus on the Job submission process.

For sure you will see this image or you’ve already seen it.

Spark Scheduling and Submission Process

Questions:

  • How does DAGScheduler split the DAG into stages?
  • How stages can be splitted into tasks?
  • What do executors do in the nutshell?

* As I said, SparkContext is the brain of Spark, I will try to link them to the previous post so you can see the SparkContext in action. All of these stuffs can be done easily if you debug a simple Spark program in your IDE. So, for simplicity, I don’t want to post too many source codes here. If you face any troubles when wandering around the source codes, don’t hesitate to email me. I just explain there the functions that I think they are important.

I translate that figure into my own figure, map each step from that figure to my figure. I will break it down and dive into each step.

spark-job-submission1. RDD Objects:

Firstly, create an RDD in SparkContext, after a list of transformation, we got a DAG. (Come back to my SparkContext post for more information on RDD). When the submission will happen? It happens right after the call to an action.

2. DAGScheduler:

When an action is called, DAGScheduler will do its job. The hard works will be done at function handleJobSubmitted. Now, I will tell you how the stages are formed. But, what is a stage? I got some  nice information about Stage in Spark source code:

  • A stage is a set of independent tasks all computing the same function that need to run as part of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the DAGScheduler runs these stages in topological order.
  • Each Stage can either be a shuffle map stage, in which case its tasks’ results are input for another stage, or a result stage, in which case its tasks directly compute the action that initiated a job.

Ok. Let’s find out what handleJobSubmitted function does:

handleJobSubmitted:

  • create new stage called finalStage, which input is the last RDD
  • create new job from this stage
  • check if we can run this job locally or not (for first(), take() actions…)
  • If no, submitStage with the input is this finalStage
    • submitStage:
      • this stage is attached with the last RDD, so we need to trace back to the ahead RDDs through getMissingParentStages function:
      • recursively visits from the final RDD to its parent. Through its dependencies:
        • shuffle dependency: create mapShuffleStage
        • narrow dependency: put into stack. This is how it calls it tries to pipeline RDDs as much as possible into one stage
      • returns a list of stages.
    • After that, submitStage got a list of missingStage. If list is:
      • Null: It went to the end of the recursive, or it reached the first stage, now, just submit the stage through submitMissingTasks function:
        • firstly figures out the indexes of partitions ids to compute
        • updates hashmaps that keep track of stage submission, job submission…
        • Posts the event to announce about the process.
        • Serialize and broadcast the stage
        • each partition will be processed by each kind of Task: ShuffleMapTask, ResultTask.
      • Not null: recursively call the submitStage with input is one by one missingStage.

3. TaskScheduler:

Ok, DAGScheduler’s just submitted the taskSet to the taskScheduler.  Now, taskScheduler does it job:

  • creates new TaskSet, submits it and waits for StageCompleted event.
  • When TaskScheduler submits a TaskSet, it also creates one TaskSetManager to track that taskset.

4. Worker:

Spark calls worker as Executor. The backend will receive the worker list from the Cluster Manager, then it will launchTask at the Executor. A BlockManager at each Executor will help it to deal with shuffle data and cached RDDs. New TaskRunner is created at the Executor and it starts the threadpool to process taskset, each task runs on one thread:

  • deserialize the broadcast stage from the TaskScheduler
  • initialize some variables for keeping track status
  • run the task. Due to each type of task, we have:
    • MapShuffleTask: iterate on each partition of RDD and compute them, write data out through shufflewriter.
    • ResultTask: acts the same way as MapShuffleTask, it has MapOutputTrackerWorker to know the map output information and get it.
  • after finishing task, result will be sent back to driver or saved to disk.

Well, I’m done. Quite a lot of things to say, huh. That’s why I want to put the source codes at least as possible or you will get lost.

Some thoughts to share:

  • Spark provides “run approximate job”. I’ve read a paper about this, the authors are also from AmpLab so there’s no surprising me.
  • Thanks to the DAGSchedulerEventProcessLoop, DAGScheduler can keep track of stages’ statuses and resubmit failed stages.
  • TaskScheduler only deals with taskSet after being formed from Stages at DAGScheduler,  that’s why it doesn’t know any thing about stages.
  • A Spark program can contain multiple DAGs, each DAG will have one action. So, inside a driver, they are submitted as jobs one by one in order.
  • Executor has TaskRunner, TaskRunner creates multiple threads to run Task. How many task can be run on a TaskRunner? I found the property “spark.task.cpus” which means Number of cores to allocate for each task. So I think it will depends on number of cores of CPU of the worker. I’ll verify it.
  • Spark has a nice features appears from version 1.2: dynamic resource allocation. Spark will base on the workload to request for extra resource when it needs or give the resources back to the cluster if they are no longer used.

Finally, if you have any questions about the post or even the detail source codes, just let me know :).

Advertisements

2 thoughts on “[Arch] Spark job submission breakdown

  1. Pingback: [Arch] SparkContext and its components | Quang-Nhat HOANG-XUAN

  2. Pingback: [Arch] SparkSQL Internals – Part 2: SparkSQL Data Flow | Quang-Nhat HOANG-XUAN

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s