[SysDeg] Worksharing Framework and its design – Part 3: Prototype for the first version

Long time no see! After one month playing with caching in Spark, I learned many valuable lessons (which will be posted on other blog posts, about Cache Manager and Block Manager of Spark). Our team came back to the  design of the system – spark SQL server. To be honest, i spent too much time on the details instead of focusing on the system design. Many things have happened and that has changed the design at the beginning.

The goal of this design is to make a general and extensible design so people can contribute to it also. During my internship, I will take care of the simplest form of worksharing, which is sharing scan. But, there are many variants of worksharing and we also want our system has those variants, in the future.

I’ll talk about those components and their functions in both client side and server side. The figure below describes the design of SparkSQL Server.

SparkSQL Server and its components

SparkSQL Server and its components

To be easy for understanding, I explain the system with three examples as the following:

*Note: the optimization will be mostly processed on the DAGs, the queries of SparkSQL will be also translated into Spark DAGs, so I will use simple Spark applications without SQL in these examples.

User1:

val input = sc.textFile(“A”)
val errors = input.filter(_.contains(“ERROR”))
errors.saveAsTextFile(“error”)

 User2:

val input = sc.textFile(“A”)
val warnings = input.filter(_.contains(“WARNING”))
warnings.saveAsTextFile(“warn”)

 User3:

val input = sc.textFile(“B”)
val info = input.filter(_.contains(“INFO”))
info.saveAsTextFile(“info”)

 

  • Client side:
    • We will modify the spark-submit command so that it will not send the application to the cluster manager but to the SparkSQL Server, using this option: –master sparksql-server
    • Each client starts it own driver with its own SparkContext. Then, the client can generate its DAG,the DataFrame creation and send those information: DAG, DataFrame generation, SQL Query to the SparkSQL Server.
    • The client needs to send the jar file, because it contains all user defined functions, classes which are very important to reassemble the DAG at server side. It also needs to sends needed information so the SparkSQL Server can reconstruct the DAG and the query. Don’t forget that the query has already been optimized by Catalyst at the client side.
    • Open questions:
      • How does the user obtain feedback about job progress? Scheduling plays an important role so can we have the information about the queueing delay on the SparkSQL Server? So that the user can decide to run the job by themselves or let it done by the SparkSQL Server.
      • How does the user kill the application (before/after it is being executed by the SparkSQL Server)?
      • How and where are Application logs stored for “debugging” (before/after it is being executed by the SparkSQL Server)?
    • So, in this step:
      • Input: Users’ applications (jar files)
      • Output: Jarfile, DAGs, DataFrame creation information, SQL queries are sent to SparkSQL Server. In those example, we have DAG1, DAG2, DAG3
  • Server side:
    • DAG Listener and DAG Queue
      • DAG Listener will accept the clients’ connections and receive DAGs, queries and other information they send. Then, it passes to the DAG Queue, the DAG Queue accepts the information the clients send at the FIFO order. In this component, the full DAG of each user will be built based on the initial DAG, the DataFrame creation information and the query. The DAG Queue has a window with fixed size, after reaching the size of the windows, DAG Queue will send a batch of DAGs (and queries also) into the next components. The window size right now is fix as a constant, it will be changed when we take care of scheduling.
      • Input: DAGs and queries from clients
      • Output: Batch of DAGs, queries, DataFrame creation information after reaching the window size of DAG Queue
      • In the example, let assume the window size of DAG Queue is 3, so the output will be DAG1, DAG2, DAG3. If there is user4 submits the job, it will be packed with another two jobs.
    • DAG Selector
      • This component is something called a pre-scheduler, which will based on the constraint attached with each query to fulfill user’s requirements. For example: job submitted with a deadline. Right now, it just uses the simple FIFO strategy.
      • Input: Batch of DAGs
      • Output: Batch of DAGs based on scheduling strategies (FIFO at the moment).
      • In the example, the input and the output is the same as we use FIFO strategy.
    • Worksharing Detector
      • This component will detect which DAGs have the opportunity for worksharing, which DAGs haven’t. Remember that worksharing here is not only sharing scan, it can also have other variants such as sharing group-by or join. It works as rule-based mechanism to detect the worksharing opportunity. At the moment, I will take care of sharing scan. From the information of the dataframe creation, we can detect if those DAGs read from the same input file or not and which attributes they are using from that file, then, we can know if they can share scan or not.
      • Input: Batch of DAGs
      • Output: Bags of DAGs which are labeled each type of sharing (sharing scan, sharing groupby…) or “no sharing”.
      • In the example, we got to Bags: DAG_Bag1: {DAG1, DAG2} with the label: “scan sharing”; DAG_Bag2: {DAG3} with the label: “no sharing”.
    • DAG Rewrite
      • This component has many families of rewrite. Each family will have many rewrite rules. It will generate as much as possible of DAGs from bags of DAgs which are labeled as “sharing” and do nothing with the “no sharing” bags. Right now, for scan sharing, we have three kinds of DAG rewrite familly : input tagging and caching, and the hybrid of input tagging and caching. The caching solution seems easy to rewrite the DAG but caching can not always be used. The input tagging is the traditional way that traditional database has used, the way we rewrite the DAG will be more difficult. In some case, our server can play smart while it combines caching and input tagging also.
      • Input: Bags of DAGs which are labeled each type of sharing (sharing scan, sharing groupby…) or “no sharing”.
      • Output: rewritten bags of DAGs for sharing bags, not-rewritten bags of DAGs for “no sharing” bags.
      • In the example above, we got DAG_Bag1*: {Rewrite1: input tagging {DAG1_2}; Rewrite2: input caching {DAG1_caching, DAG2}
    • Cost-based Optimizer (CBO)
      • CBO receives not-rewritten/rewritten bags of DAGs. The rewritten bags of DAGs contains DAGs which were rewritten for worksharing purpose. CBO will have a model to evaluate which rewritten DAG has the lowest cost. For the not-rewritten bags of DAGs, CBO does nothing with them as they haven’t any sharing opportunity at all.
      • Input: not-rewritten/rewritten bags of DAGs
      • Output: not-rewritten bags of DAGs, lowest-cost DAGs from rewritten bags of DAGs.
      • In the example above, we have DAG_Bag1* which has two rewrites. The rewrite 1 with the input tagging (or labelling) and the rewrite 2 with the caching. CBO will choose the one has the lowest cost and send to the scheduler.
    • Scheduler
      • As its name, do the scheduling. Right now, we just use the FIFO for all DAGs. For scan sharing, if a rewrite has a caching, the order will be re-organized so that the DAG with caching will be executed before the one hasn’t.
      • Input: not rewritten bags of DAGs, lowest-cost DAGs from rewritten bags of DAGs.
      • Output: due to the scheduling strategy. (FIFO at right now)
      • In the example I gave above, if we choose input tagging for DAG1 and DAG2, it will become one compound job and will be submitted with DAG3 in FIFO order. If we choose caching for DAG1 and DAG2, the executing order is: DAG1, DAG2, DAG3 because DAG1 need to cache the input before DAG2 reads it.

As I said in a post from the beginning, this design can be changed if we find out a better one! Again, this design aims to get the generic and extensibility as much as possible because our SparkSQL Server does not only serve for the scan sharing purpose.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s