[SysDeg] Worksharing Framework and its design – Part 2: Communication method

After having a basic understanding about Spark and SparkSQL, I came back to my system. The high level design of the system remains the same as I described two months ago. It is a client-server model, but the server is changed from the Spark server to the SparkSQL server.

I spent roughly two weeks for some coding and testing on this new system. Below is my thinking and development flow, step by step, so you can follow my thinking and check if I’m correct or not. The goal right now is just to tackle the communication method between client and server, no optimizations yet.

1. SparkSQL, LogicalPlan, Catalyst… Double check them again before moving on!

I knew that I will work on SparkSQL, so I need to check if its logical plan is feasible to work on or not.

To verify it, I wrote a simple Spark application which has multiple SQL queries, then, extracted the Logical Plan of each query into a separate variable to see if I can use them independently or not. This is important because I will send those Logical Plans to the SparkSQL server later on. It worked quite well.

Question: I wonder when I receive all the LPs from many clients, I will group them into a big tree and then do the optimization or I just leave them as the way they are and optimize them.

Answer: I thought about some pros and cons of both approaches. Right now, I haven’t thought deeply about it yet, but in case I want to group all the Logical Plans into a bigger and unified tree, I can “UNION” those Logical Plans and this is supported by SparkSQL.

2. More or less, I alway need to send the DAG!

If you remember, SparkSQL LogicalPlan is the best replacement for Spark DAG, as the previous post. So, why do I still need to send the DAG?

OK, if it is Spark or SparkSQL, we also need to load the data from the disk. Its representation is an RDD. It’s not a big deal if we just need to send the file path to the server, but users can do some transformation on that RDD before turning it into a table! That’s the point.

As the previous post, the problem about the Spark DAG that I faced were:

  • I only received the last node of the DAG, not the whole DAG. Reason: the SparkContext at the client is attached to each RDD in the DAG and it is not serializable. That’s why I cannot receive the whole DAG at the server.
  • The anonymous function is also a problem to me. Each user can have many anonymous functions. So, send those anonymous classes to the Server if you don’t want to receive the error “ClassNotFound”.

I came up with a solution above:

  • To receive the whole DAG, I don’t send it all at once. I break the DAG into RDDs and their dependencies, then I send them one by one to the server. At the server, I reassemble them all to get a full DAG. Each RDD will be attached to the SparkContext at the server.
  • All the class files of the anonymous functions will be sent to the server too. Then, I add them to the jar file of the SparkServer.  (Using this command: jar uf jar-file input-files).

After successfully receiving the full DAG. I perform some actions on that DAG, saveAsTextFile for example, and it worked really well. Up to now, I’ve not found any problems with this solution yet.

3. RDD tracking table

Let’s take a look again at this simple SparkSQL example.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
val peopleC = people.toDF()
peopleC.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
val op = teenagers.map(t => "Name: " + t(0))

So, I summarize a lifetime of a SparkSQL application before it is executed in four steps:

  • Load the data from disk, return an RDD (and perform some transformation).
  • Turn that RDD into a DataFrame.
  • Create a table from that DataFrame.
  • Create SQL query.

The first step has been solved. What about the rest? I need to have some kind of tracking that RDD above:

  • Is it turned into a DataFrame or not?
  • Is that DataFrame used in the query (as a table for example) or not?

I think about some kind of management, let’s imagine it’s a table. Each entry is composed by an RDD and some flags that let us know the information about the DataFrame, Table creation.

Whenever a DataFrame/table creation from an RDD is called, the client sends a signal to announce the server. The server then just needs to perform that call on the associated RDD.

This tracking method must sit on both sides: client and server. Why? There may be many RDD in one client, we need to know which RDD makes the call and let the server know it.

Two weeks and many things to say. It’s better to have a figure, so you can understand what I said clearer. (The table tracking is just an abstract object, I haven’t worked on it yet)

protocol-0.0.1

There’re still many things ahead that I need to solve. A draft plan for the next step could be something like this:

  • Make the tracking table work
  • Think about if many users have the same application names (same jar filename, for example), what would happen to the anonymous classes?
  • What will I do when I have many DAG from many users? Try to make the SparkSQL Server execute them IN PARALLEL, so multiple ‘minor’ SparkContext should be created alongside the main SparkContext.

If you have any comments or questions, just leave a comment below. 🙂

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s