Breadcrumb Breadcrumb

Web Content Display Web Content Display

 

Asset Publisher Asset Publisher

Back

Spark Dataframes – From machine readable to human fathomable code

Background

Apache Spark, an in-memory cluster compute platform that provides over 100x performance for certain applications over traditional MapReduce, has hit home with data analysts and developers alike,  on the promise of writing  succinct & much simpler code.  Continuing with this tradition of speed and simplicity, with Spark 1.3 version, a new abstraction called Dataframes (so long Schema RDDs) has been introduced.

Dataframes enable developers to create and run Spark programs faster by writing less code & reading only the required data through high level operations that Dataframe API provides.

What's a Dataframe?

A dataframe, similar to a RDD, is a distributed collection of rows, with named columns.  Dataframe has a lot of schema/structure information in it,  allowing the Spark engine to exploit that structure of underlying data to run your Spark jobs faster. 

The Dataframe concept in Spark isn’t new and is heavily borrowed from R and Python’s data analysis library (Panda).  These Libraries provide you with a very simple abstractions of filtering, selecting columns, aggregation, plotting etc.

Schema RDD of Spark 1.2 has been rechristened to Spark Dataframe as part of 1.3 along with a API unification across Java, Scala and Python. From 1.3 onwards, Spark Dataframe is technically not a schema RDD through inheritance.  Inheritance has been removed from its hierarchy.  More importantly, Spark SQL module in 1.3 version has its alpha/experimental tags removed guaranteeing API stability across the entire 1.3 stack.

Example

Enough talk - let's look at example.  The goal is to find average gate departure delays  at all the airports in the US for the year 2014. We will first calculate this average using the traditional way and then use Dataframe on the same data set, joining it to two other look-up tables - GeoCode and Carrier files.  The public data set we have picked for this is the On-time performance data for all US airlines for the year 2014.

Traditionally, you would write your MR code as shown in Listing 1.  Without an understanding of the metadata of the unerlying dataset, reading this esoteric code seems a little unwieldy.


Listing 1

scala> var delays = deptDelays_2014.map(x => (x.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)")(14), toDouble(x.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)")(32))));
scala> delays.mapValues((_, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues{ case (sum, count) => (1.0 * sum)/count}.collectAsMap()
res29: scala.collection.Map[String,Double] = Map("LGA" -> 13.85575588183916, "OME" -> 7.442953020134228, "ORF" -> 13.401497938231087, "SLC" -> 8.274310461894189, "AZO" -> 8.56232427366448, "BQN" -> 15.299703264094955, "SUN" -> 11.503620273531778, "EWR" -> 16.740703068140963, "PNS" -> 10.681928001932834, "STL" -> 14.069748674198754, "AVP" -> 17.43314651721377, "GEG" -> 8.873259355961705, "IAH" -> 14.513608037290533, "DHN" -> 7.834855938158819, "GUC" -> 22.937888198757765, "HYA" -> 3.925, "LWS" -> 3.499261447562777, "BZN" -> 14.819143310417768, "RST" -> 10.848408710217756, "TPA" -> 13.669393898749647, "BTV" -> 18.7355476673428, "HOU" -> 13.982479718102105, "SNA" -> 11.619507073938909, "BMI" -> 13.972002635046113, "LNK" -> 22.61370202228642, "EGE" -> 24.9670510708402, "CMH" -> 15.344372072...

Now contrast this with Listing 2 that uses data frame.  Note, the average delay at  LGA. It is about 13.86 minutes.

Listing 2

String query2 = "SELECT avg(CAST(DepDelayMinutes as float)) as delay, o.Origin, a.Description, g.Latitude, g.Longitude FROM OTP o,  GEOCODE g, AIRPORT_L a  "
					+ "WHERE  g.locationID = o.Origin AND a.Code = o.Origin group by  o.Origin, a.Description, g.Latitude, g.Longitude  ";
			
			DataFrame df = getDataFrameForaYear(sqlCtx, year);
			df.registerTempTable("OTP");
			df.printSchema();		
			DataFrame dfGeo = getGeoCodeDF(sqlCtx);
			dfGeo.registerTempTable("GEOCODE");
			dfGeo.printSchema();
			
			DataFrame dfAirPrt = getAirportDF(sqlCtx);
			dfAirPrt.registerTempTable("AIRPORT_L");
			dfAirPrt.printSchema();
	
			sqlCtx.sql(query2).explain(true);
			
			Column sortCol = new Column("delay");
			DataFrame dfPers = sqlCtx.sql(query2).sort(sortCol).persist();

As shown above in bold, dataframes can work with SQL-like syntax and is very lucid.   Spark SQL is the lingua franca for doing data analysis and BI.  Spark SQL is more than just SQL. You can mix-and-match Spark SQL along side Spark Program.  It has rich language bindings in Scala, Python and Java just as rest of the Spark stack .  Things that don’t lend themselves into SQL, can be moved into custom code. Also, using the query optimization framework, Catalyst,  Spark tries to optimize the query (ex. predicate push down) to speed-up the data processing pipeline.

 

Phases of query planning in Spark SQL. Rounded rectangles represent Catalyst trees

Catalyst takes advantage of Scala’s powerful language features such as pattern matching and runtime metaprogramming to allow developers to concisely specify complex relational optimizations.  Most analysts prefer  Spark SQL as the gateway to the data stored in HDFS.  Here are the different plans for the above query.

 

Listing 3

== Parsed Logical Plan ==
'Aggregate ['o.Origin,'a.Description,'g.Latitude,'g.Longitude], 
  [AVG(CAST('DepDelayMinutes, FloatType)) 
   AS delay#225,'o.Origin,'a.Description,'g.Latitude,'g.Longitude]
 'Filter (('g.locationID = 'o.Origin) && ('a.Code = 'o.Origin))
  'Join Inner, None
   'Join Inner, None
    'UnresolvedRelation [OTP], Some(o)
    'UnresolvedRelation [GEOCODE], Some(g)
   'UnresolvedRelation [AIRPORT_L], Some(a)

== Analyzed Logical Plan ==
Aggregate [Origin#14,Description#224,Latitude#221,Longitude#222], 
	[AVG(CAST(CAST(DepDelayMinutes#32, FloatType), DoubleType)) AS delay#225,
	Origin#14,Description#224,Latitude#221,Longitude#222]
 Filter ((locationID#220 = Origin#14) && (Code#223 = Origin#14))
  Join Inner, None
   Join Inner, None
    Subquery o
     Subquery OTP
      Union
       Relation[Year#0,Quarter#1,Month#2,..........]
CsvRelation(On_Time_On_Time_Performance_2014_2.csv,true,,,",\,PERMISSIVE,null)
    Subquery g
     Subquery GEOCODE
      Relation[locationID#220,Latitude#221,Longitude#222] 
       CsvRelation(Airport_Codes_mapped_to_Latitude_Longitude.csv,true,,,",\,PERMISSIVE,null)
   Subquery a
    Subquery AIRPORT_L
     Relation[Code#223,Description#224] CsvRelation(/L_AIRPORT.csv_,true,,,",\,PERMISSIVE,null)

== Optimized Logical Plan ==
Aggregate [Origin#14,Description#224,Latitude#221,Longitude#222], 
  [AVG(CAST(CAST(DepDelayMinutes#32, FloatType), DoubleType)) AS delay#225,Origin#14,Description#224,Latitude#221,Longitude#222]
 Project [Longitude#222,DepDelayMinutes#32,Origin#14,Description#224,Latitude#221]
  Join Inner, Some((Code#223 = Origin#14))
   Project [Origin#14,DepDelayMinutes#32,Latitude#221,Longitude#222]
    Join Inner, Some((locationID#220 = Origin#14))
     Union
      Project [Origin#14,DepDelayMinutes#32]
== Physical Plan ==
Aggregate false, [Origin#14,Description#224,Latitude#221,Longitude#222], 
		[(CAST(SUM(PartialSum#229), DoubleType) / CAST(SUM(PartialCount#230L), DoubleType)) AS delay#225,
		Origin#14,Description#224,Latitude#221,Longitude#222]
 Aggregate true, [Origin#14,Description#224,Latitude#221,Longitude#222], [Origin#14,Description#224,
 	Latitude#221,Longitude#222,COUNT(CAST(CAST(DepDelayMinutes#32, FloatType), DoubleType)) AS PartialCount#230L,
 		SUM(CAST(CAST(DepDelayMinutes#32, FloatType), DoubleType)) AS PartialSum#229]
  Project [Longitude#222,DepDelayMinutes#32,Origin#14,Description#224,Latitude#221]
   ShuffledHashJoin [Origin#14], [Code#223], BuildRight
    Project [Origin#14,DepDelayMinutes#32,Latitude#221,Longitude#222]
     ShuffledHashJoin [Origin#14], [locationID#220], BuildRight
      Exchange (HashPartitioning [Origin#14], 200)
       Union [Project [Origin#14,DepDelayMinutes#32]
 PhysicalRDD 

 

Here is the JSON representation for airports  LGA  and FAT generated from the dataframe.

{
"abbrev":"LGA",
"parentState":"LGA",
"airport":"New York, NY: LaGuardia",
"lat":"40.7772",
"lon":"-73.8725",
"avg":"13.86"
},
{
"abbrev":"FAT",
"parentState":"FAT",
"airport":"Fresno, CA: Fresno Yosemite International",
"lat":"36.7761",
"lon":"-119.7181",
"avg":"13.91"
},

Feeding JSON data for all the airports to HighMaps - gives you a complete perspective on which airport to pick, next time you want to fly out.

 

Here is  the output representated using HighCharts.  To view a bigger map, click here

 

Asset Publisher Asset Publisher

Additional Career Openings

07 Apr 2016 Openings for Software Engineer Somerset, NJ,07 Apr 2016 Sofia Technology, LLC has openings for Software Engineers in Somerset, NJ and other...

Spark Dataframes – From machine readable to human fathomable code

Background Apache Spark , an in-memory cluster compute platform that provides over 100x performance for certain applications over traditional MapReduce, has hit home with data analysts and...

Careers-Content-Testimonial

  Why We need you? We have an exceptionally great team at SOFIA and you now have an opportunity to play a key role as well. You will be joining a team in which each and every...

Shailesh Dangi

Shailesh Dangi Managing Partner & CTO Lorem Ipsum

Portal Technology Areas

Oracle Web Center Liferay IBM WebSphere Portal

SOFIA Liferay Competencies

SOFIA Liferay Competencies SOFIA has successfully led multiple portal projects by providing the required " thought leadership " that goes beyond product customization, delivering industry...

401(k) Qualified Retirement Plan

401(k) Qualified Retirement Plan SOFIA provides eligible employees with a 401(k) Qualified Retirement plan which is an excellent means of long-term...

Career-Grid-Blocks

we're glad you are here Armed with inveterate convictions to succeed, we at SOFIA share an entrepreneurial culture, where employees think and act like owners. Our...

Big-Data-Page-Banner

Ignite your Big Data Projects with a "Spark" Real-Time Predictive Analytics with Spark Streaming

Liferay Platinum Partner

Liferay Platinum Partner SOFIA's partnership with Liferay provides its clients a foundation for web delivery that has one of the lowest TCO with a quicker time-to-market...

Liferay Recognizes SOFIA Technology as a Platinum Partner

22 Oct 2013 Liferay Recognizes SOFIA Technology as a Platinum Partner Partnership continues to grow between Liferay and enterprise portal services leader in financial...

SOFIA Liferay Quote

Over the past three years, the SOFIA team has been a leader in bringing Liferay implementations to the financial services space. We look forward to our strengthened partnership with them.

SOFIA Liferay Services

Liferay Liferay, a Leader in Gartner’s Magic Quadrant for Horizontal Portals, is one of the the fastest-growing Java-based portal, increasing its market momentum even as other Java portals...

Case Study

Client Success & Case Study At SOFIA, we like to share some of the key constituents for client success – combining our technical depths along with...