Service Symphony

Getting Started with Spark on SBT

In this article we will look at writing a simple application to analyse data using Spark and SBT. SBT is a build tool for Scala and can be downloaded from http://www.scala-sbt.org/. This tutorial assumes you have already downloaded SBT and made the binaries available in the path. The data we will use for analysis is based on the UK sold house prices available from http://data.gov.uk. The task is to find some statistics on prices of houses sold within Greater London in 2014. The data is available as comma separated list of records, with columns for price, data, property type, address etc. The fields we are interested are price (field 2) and county (field 14).

First step is to create a directory, where you can store the application source files and data,

meeraj:/Users/meeraj/Projects/spark-articles>mkdir getting-started
meeraj:/Users/meeraj/Projects/spark-articles>cd getting-started

Next step is to get data from the data.gov.uk website for 2014,

curl http://publicdata.landregistry.gov.uk/market-trend-data/price-paid-data/b/pp-2014.csv > pp-2014.csv

Now, let us create the SBT build script.

meeraj:/Users/meeraj/Projects/spark-articles/getting-started>vi build.sbt

Copy and paste the following to your build script and save the file,

name := "House Price Application"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"

The script above does the following,

  1. Sets the name of the project to “House Price Application”
  2. Sets the project version to 1.0
  3. Sets the Scala version to use to 2.10.4
  4. Declares a dependency on Spark core library, version 1.5.1

Now we will go ahead and write our application code,

meeraj:/Users/meeraj/Projects/spark-articles/getting-started>vi HousePriceApp.scala

Save the following contents to the above file,

import org.apache.spark.{SparkContext, SparkConf}
object HousePriceApp extends App {  
  val conf = new SparkConf().setAppName("House Price Application").setMaster("local[4]")  
  val sc = new SparkContext(conf)  
  val rawData = sc.textFile("pp-2014.csv")  
  val rows = rawData.map(line => line.split(",").map(field => field.replace("\"", "")))  
  val priceByCounty = rows.filter(row => "GREATER LONDON".equals(row(13))).map(row => row(1).toDouble)  
  println("Stats for greater London is " + priceByCounty.stats)  
  sc.stop()
}

The code does the following,

  1. Creates a standalone Scala application called HousePriceApp
  2. Creates a Spark configuration object to run in standalone mode locally (the driver and the executors will all run in the same VM), with four threads allocated for the execution of tasks
  3. Creates a Spark context, which is the entry point into the Spark API
  4. Use the Spark context to read the local file we downloaded. This will create an RDD[String], where each line represents a line in the CSV file. Please also note that this doesn’t really read the contents of the file into the memory. It is only done when an action is executed on the RDD. Function calls on RDDs are either transformations (like filtering, mapping etc) or actions (like aggregating, grouping, counting, collecting etc). The series of transformations and actions result in the context creating a lazy DAG (Directed Acyclic Graph) and are only executed when the first action is performed.
  5. Splits each line into an array of Strings, and strips each element of the array off any quotes. Now the value rows is an RDD[Array[String]]
  6. Next line extracts the second field in each record, which is the price (converting it to a double), for all records whose fourteenth field is “GREATER LONDON”. The value priceByCountry is now a RDD[Double]
  7. Then we call the stats function on RDD[Double], which is available through implicit conversion to print the total sales, minimum, maximum, mean and standard deviation for the prices.

Now you can run the application with,

meeraj:/Users/meeraj/Projects/spark-articles/getting-started>sbt run

You will be able to see the following output midst the log messages emitted by Spark and SBT

Stats for greater London is (count: 110790, mean: 599397.452992, stdev: 1700522.309786, max: 96264933.000000, min: 100.000000)

Even though this is a simple application, it demonstrates the strengths of some key abstractions in Spark.

  • The RDD API is very similar to Scala collections, and supports the combinators and higher order functions for mapping, filtering and aggregating data
  • All the transformations, filtering mapping etc, are transformations that are lazily executed
  • The transformations are applied only when we perform an action, in our case calling the stats method
  • If you are running in a cluster, data is read in parallel by the different executors (Spark will rely on underlying data adapters on deciding on the appropriate way of reading blocks of data in parallel across nodes)
  • The transformations are run in parallel
  • When an action is executed data is collated in parallel and the results brought back to the driver
  • In our example, all four threads will read separate blocks of input data in parallel, splits them into an array of strings, strips the double quotes, extract field 2 of all records where field 14 is GREATER LONDON, and convert it to a double
  • However, the core API abstracts away the gory details of partitioning and distributed computing and offers a natural programming model, as if working with local Scala collections
  • At the same time, API also provides finer level control to the experts to control the degree of parallelism, controlling and coalescing partitions.

In the next edition, we will have a closer look at the RDD API from the Spark Shell.

Share this:

One response to “Getting Started with Spark on SBT”

  1. Bunny says:

    Grade A stuff. I’m unobustiqnaely in your debt.

Leave a Reply

Your email address will not be published. Required fields are marked *

9 + 5 =

Some of our
Clients
Service Symphony

Service symphony © 2023

Quick Links
Contact Info

20-22 Wenlock Road
London
N1 7GU
United Kingdom

Support

0800 011 4386