Service Symphony

Spark Vs Hadoop Map/Reduce

In this post we will have a quick look at comparing the Hadoop Map/Reduce programming model against the Spark API. We will continue the theme from the previous posts and use the sold house price data. The challenge is to get the maximum sold house price for each county.

Using Hadoop Map/Reduce

The terms Hadoop as it is now, represents a whole big data echo system that includes not just the Hadoop distributed file system, its Map/Reduce programming model and its clustering infrastructure YARN, it can also possibly include various distribution sets from commercial vendors that include higher level abstractions like HIVE, PIG and NoSQL databases, and even Spark. However, this post is really a quick comparison of the Java API for the Hadoop Map/Reduce programming model and the Spark API for transformations and actions.

There are further aspects to consider regarding Hadoop’s IO bounds as opposed to Spark’s efficient Map based model. However, I am going to stick to the succinctness, expressiveness and conciseness of applications written in two programming models. With no doubt, this comes from the Spark API being written in a functional language and the API inherently being idiomatic and expressive.

As most of you, with some knowhow of big data would know, one of the fundamental patterns underpinning efficient processing of large volumes of data is Map/Reduce. It is fundamentally increasing parallelism by getting small jobs to run in parallel across process spaces against discrete units of data and produce an intermediate result (mapping phase) and the data is subsequently collated against defined keys and passed for further processing to a reducing phase.

So to write a Hadoop M/R based application, one needs three basic components.

  1. A mapper component
  2. A reducer component
  3. A main entry point class that initialises a job that is run by Hadoop runtime that utilises the mapper and reducer

For our example, the mapper is going to extract the county and sold price of each property and return that as a key/value pair to Hadoop. Hadoop will collate all the pairs with the same key and pass the collated list with the key to the reducer, which will in turn find the maximum price and return that to Hadoop, and Hadoop will write the final collated result to the specified output directory.

I have written the application as a standalone SBT build, where Hadoop will run embedded in local mode running against the local file system.

SBT Build File

The SBT build file shown below is pretty much self-explanatory.

lazy val root = (project in file(".")).
  settings(
    name := "Max Price",
    organization := "io.cloudata",
    version := "0.1.0",
    scalaVersion := "2.11.5",
    libraryDependencies += 
      "org.apache.hadoop" % "hadoop-mapreduce-client-app" % "2.6.2",
    libraryDependencies += 
      "org.apache.hadoop" % "hadoop-common" % "2.6.2"
  )

Hadoop Application Code

I have written the application code in Scala, however, one can notice an imperative, unidiomatic and non-expressive API, doesn’t lend itself well to client code written in a functional language. I have the job, mapper and reducer all in one source file.

Import Statements

For brevity, I have defined all imports at the top.

import java.lang
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{DoubleWritable => DW}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.io.{LongWritable => LW}
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer

Mapper Class

The mapper class is passed one line at a time from the input by Hadoop, which it in turn parses and writes a key value pair of the county field and sold price field back to the context.

class MaxPriceMapper extends Mapper[LW, Text, Text, DW] {
type CTX =  Mapper[LW, Text, Text, DW]#Context
override def map(k: LW, v: Text, ctx: CTX) = {
val fields = v.toString.split(",").map(_.replace("\"", ""))
if ("" != fields(13).trim)
ctx.write(new Text(fields(13)), new DW(fields(1).toDouble))
}
}

Reducer Class

Hadoop collates all the pairs with same keys and passes a pair of key and list of values to the reducer. Our reducer implementation, finds the maximum value and writes it to the context, which Hadoop will send to the final output.

class MaxPriceReducer extends Reducer[Text, DW, Text, DW] {
type CTX = Reducer[Text, DW, Text, DW]#Context
override def reduce(k: Text, v: lang.Iterable[DW], ctx: CTX) = {
import collection.JavaConversions._
val list = for (value <- v) yield new DW(value.get)
ctx.write(k, list.toList.sortWith(_.get > _.get)(0))
}
}

Main Job Runner

The main job runner is the mundane Hadoop API for starting the job.

class MaxPriceReducer extends Reducer[Text, DW, Text, DW] {
type CTX = Reducer[Text, DW, Text, DW]#Context
override def reduce(k: Text, v: lang.Iterable[DW], ctx: CTX) = {
import collection.JavaConversions._
val list = for (value <- v) yield new DW(value.get)
ctx.write(k, list.toList.sortWith(_.get > _.get)(0))
}
}

Running the Application

To run the application, simply run the following from the console.

sbt "run pp-2014.csv output"

This will write output shown below (truncated) to the file output/part-r-00000

ADUR    599000.0
ALLERDALE       390000.0
AMBER VALLEY    7621200.0
ARUN    1750000.0
ASHFIELD        275000.0
ASHFORD 1305000.0
AYLESBURY VALE  1025000.0

Using the Spark API

We have already seen a number of examples on how to use the Spark API to achieve the same result. Though the snippet below, will do the job

scala> sc.textFile("pp-2014.csv").
     | map(_.split(",").map(_.replace("\"", ""))).
     | map(r => (r(13), r(1).toDouble)).
     | groupByKey.
     | mapValues(_.max).
     | take(10).
     | foreach(println)
(CAMBRIDGE,3120000.0)                                                       
(SOUTH NORTHAMPTONSHIRE,900000.0)
(STAFFORDSHIRE MOORLANDS,268000.0)
(CITY OF DERBY,5.2267467E7)
(SUFFOLK,1.8243199E7)
(BRIDGEND,3760660.0)
(BLAENAU GWENT,2514173.0)
(MOLE VALLEY,4360000.0)
(SEVENOAKS,1425000.0)
(HAVANT,850000.0)

Conclusion

I have just compared here the two programming models and nothing else. I have not touched the inefficiency of Hadoop M/R’s IO bound model, against Spark’s efficient memory based model, especially for iterative M/L algorithms. As one can see the Spark API is concise, expressive and idiomatic. Whereas, the Hadoop M/R API is imperative, verbose and longwinded. Of course one could argue Hadoop ecosystem has high-level abstractions like PIG and Hive. But I have not gone into Spark dataframe and dataset APIs.

Also, more importantly Spark programming model doesn’t look at Map/Reduce as a hammer and turn everything into a nail, there are big data problems that don’t always lend themselves well into key/value pairs and map/reduce model.

Having said this, you will hardly find a live Spark installation that doesn’t live in a Hadoop ecosystem, either for legacy reasons or for using HDFS and/or YARN.

Share this:

Leave a Reply

Your email address will not be published. Required fields are marked *

11 − 11 =

Some of our
Clients