[Arch] SparkSQL Internals – Part 2: SparkSQL Data Flow

The paper of SparkSQL provides a very nice figure about SparkSQL data flow.

sparksql-data-flow

I’ve had experiences on Apache Pig for more than one year so I realized that it is better to put them all together. I created a new figure that includes the data flow of Hive, Pig, and SparkSQL.

sparksql-hive-pig-comparison

  • I know a little about Hive but I also put there its data flow for those who are familiar with Hive.
  • In new Hive versions, they have a cost-based model which is Calcite but I didn’t put it there because it’s still in developing process and it is a plugin that (hopefully) can be plugged to any systems in the Hadoop ecosystem.
  • Pig and Hive can run on Spark too but they can not expose all their abilities. So in this figure, I assume Pig and Hive are running on Hadoop MapReduce.

So, all the systems above have these things in common: Parser, LogicalPlan, Optimizer, PhysicalPlan, Execution Plan.

Ok, now you got the big picture of SparkSQL, I will group those components in the first figure into three processes and dive into each process:

  • DataFrame/SQL creation, manipulation, Plan Generation
  • Optimization
  • Submission

spark-sql-dataflow

I wrote a very simple SparkSQL program which you could find it easily in SparkSQL document. Then, I traced the flow of it by debugging. Those figures below are used to describe what I say with some functions and pseudo codes. Before going into details, please take a look at my previous post to understand some components in SQLContext. 

  1. DataFrame/SQL Creation, Manipulation and Plan Generation.

dataframe-sql-creation-manipulation

We have many kinds of creating a DataFrame. When you create a new DataFrame, it will create an unresolved LogicalPlan. Then, a new QueryExecution will be created. In this QueryExecution, all the plans will be created but not executed because they are lazy variables. So, we got:

  • analyzed Plan
  • withCachedData Plan
  • optimized Plan
  • sparkPlan
  • executedPlan

If “DATAFRAME_EAGER_ANALYSIS” is enable, the unresolved Plan will be analyzed by the Analyzer eagerly to throw the error whenever errors happen.

When you manipulate a DataFrame, depends on the function you want to apply on it, a new DataFrame will be return.

  1. Optimization

When an action is called, the DataFrame we created need to return an RDD to Spark-Core module. Spark-Core will do the rest.

It is better to show the code here than showing the figure. I show below a small and simple part of codes that relates to this section.

If analyzed has been already executed, SparkSQL skips it and goes to check if it needs to return a cachedData Plan (some kind of Logical Plan with caching) or not. Then, it begins optimizing the logical plan, generates many physical plans, through the cost model (only used for choosing join algorithms at the moment), choose one physical plan, finally it will return an RDD.

This RDD, actually, a DAG of RDD (as it is a linkedlist of RDDs).

def assertAnalyzed(): Unit = checkAnalysis(analyzed)
lazy val analyzed: LogicalPlan = analyzer(logical)
lazy val withCachedData: LogicalPlan = {
    assertAnalyzed
    cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)
lazy val sparkPlan: SparkPlan = {
    SparkPlan.currentContext.set(self)
    planner(optimizedPlan).next()
}
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
lazy val toRdd: RDD[Row] = executedPlan.execute()
  1. Job Scheduling and Submission

We have the DAG of RDD, now, Spark-core does the rest. Go back to Spark Job Submission Breakdown.

*Some thoughts to share:

    • The LogicalPlan is a TreeNode type, which I can find many information. This is good news for the optimization in worksharing. So, I need to postpone all the actions before finishing all the optimization for the LogicalPlan.
    • This is a simple SQL query I used to find out the flow. The figure which is right above is a snapshot of the logical plan of that query, for those whose want to see its “shape”. The created DAG is below too.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
val peopleC = people.toDF()
peopleC.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
val op = teenagers.map(t => "Name: " + t(0))

logicalplan-sparksql-snapshot

(1) MapPartitionsRDD[12] at map at DataFrame.scala:776 []
 |  MapPartitionsRDD[11] at map at DataFrame.scala:889 []
 |  MapPartitionsRDD[10] at mapPartitions at basicOperators.scala:40 []
 |  MapPartitionsRDD[9] at mapPartitions at basicOperators.scala:55 []
 |  MapPartitionsRDD[8] at mapPartitions at ExistingRDD.scala:35 []
 |  MapPartitionsRDD[3] at map at SimpleApp.scala:24 []
 |  MapPartitionsRDD[2] at map at SimpleApp.scala:24 []
 |  examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at SimpleApp.scala:24 []
 |  examples/src/main/resources/people.txt HadoopRDD[0] at textFile at SimpleApp.scala:24 []
  • Next posts, I will focus on the Logical Plan and the optimization process, and caching because I will work with them the most.
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s