Service Symphony

RDD Actions

In the last post we had a look at transformations in Spark. Transformations essentially work on RDDs, to create new RDDs, by transforming the contents of the original RDD. One may use transformations for a number of use cases including mapping or filtering data. Actions on the other hand are used to perform some processing against the data within the RDD and compute and return a result back to the driver. In this post we will look at some of the frequently used operations available on the RDD API to perform actions.

Note 1: We will continue to use the house price data, for the examples, we were using from the previous posts.

Previewing Data

If you want to preview the contents of the RDD, you can either use the first or take method as shown below,

scala> sc.textFile("pp-2014.csv").map(_.split(",")(1)).first
res9: String = "370000"
scala> sc.textFile("pp-2014.csv").map(_.split(",")(1)).take(3)
res10: Array[String] = Array("370000", "498500", "213000")

The snippet above gets the second field from the CSV and get the contents of the first record and first three records respectively.

Collecting Data

Once you have filtered the data to a manageable size and want to bring it to the driver for some local processing or just to view it, you can use the collect action. The snippet below, gets the sold price in a given postcode.

scala> sc.textFile("pp-2014.csv").
     | filter(_.contains("HA9 8QF")).
     | map(_.split(",")(1)).
     | collect
res35: Array[String] = Array("370000")

You can also use the foreach action to iterate over the contents of the RDD.

Counting Data

If you want to count the number of records in the RDD, you can use the count action as shown below,

scala> sc.textFile("pp-2014.csv").count
res6: Long = 980026

Reducing Data

One of the key actions available is to reduce the data within RDD, to generally produce some kind of scalar result. Suppose we want to find the total price of all houses sold in GREATER LONDON, we can do the following,

scala> sc.textFile("pp-2014.csv").
     | filter(_.contains("GREATER LONDON")).
     | map(_.split(",")).
     | map(_(1).replace("\"", "").toDouble).
     | reduce((a, b) => a + b)
res6: Double = 7.6443103685E10

Reduce takes a function of two parameters. It gets called iteratively on the contents of the RDD, till there are no elements left. Starting from the first two elements, and subsequently passing the result of the operation and the third element to the function and so on. Reducing an empty collection will result in an error.

Folding Vs Reducing

There is an alternative option called fold, which takes a zero value, whereas, reduce takes the first element of the collection as the zero value. Both reduce and fold are run on each partition and then on the driver. This means with fold, the zero element should be an identity element and the operation is commutative and associative. The identity element, for example, for addition is 0 and for multiplication is 1. Otherwise, the operation will produce unexpected result.

For example, look at the snippet below,

scala> sc.parallelize(List[Int](1, 2, 3), 2).fold(0)(_ + _)
res28: Int = 6

Now using a non identity element produces a result that you don’t expect, as the zero value is applied on each partition as well as the driver. With two partitions, the zero value gets added three times, one for each partition and one for the driver.

scala> sc.parallelize(List[Int](1, 2, 3), 2).fold(1)(_ + _)
res29: Int = 9

Aggregate Action

Let us look at how we can find the average house price sold in 2014. One way to do that is shown below,

scala> val raw = sc.textFile("pp-2014.csv")
scala> val prices = raw.map(_.split(",")(1).replace("\"", "").toDouble)
scala> val mappedPrices = prices.map((_, 1))
scala> val total = mappedPrices.reduce((a, b) => (a._1 + b._1, a._2 + b._2))
scala> total._1 / total._2
res3: Double = 280405.28372002376
  1. First we read the raw data
  2. Extract the price field as a double
  3. Transform each price element to a tuple of price and the number 1
  4. Reduce the list of tuples into a tuple of total price and total count
  5. Find the average by dividing the total price by total count

The reason we have this rather long-winded way by mapping prices to a tuple of price and number 1 using the map transformation before the reduce action is because the reduce and fold method can only ever return the type of the RDD. However, to get the average we need both the total price and total count.

This is where the aggregate action comes handy. Similar to the fold action, it takes a zero value, however, it takes two aggregators, one (U, T) => U which aggregates to type U from the RDD type T and the second (U, U) => U. This is run locally on each partition. The second aggregator (U, U) => U, takes the values from the first aggregator and produces a single value. An example is shown below,

scala> val total = prices.aggregate((0.0, 0))(
     | (a, b) => (a._1 + b, a._2 + 1),
     | (a, b) => (a._1 + b._1, a._2 + b._2))
total: (Double, Int) = (2.74804468583E11,980026) 

RDD Conversions

When you use a map transformation to convert from one RDD type to another, the API may provide additional operations. For example, RDD of numeric types offer statistical operations. In the Scala world this magic is made possible using implicit conversions.

scala> prices.mean
res10: Double = 280405.28372002486                                           
scala> prices.max
res11: Double = 9.6264933E7                                                     
scala> prices.min
res12: Double = 100.0                                                           
scala> prices.variance
res13: Double = 7.288526119880985E11 

What Next

In the next section we will have a look at working with pair RDDs, which is one of the key features of Spark.

 

Share this:

Leave a Reply

Your email address will not be published. Required fields are marked *

five × two =

Some of our
Clients
Service Symphony

Service symphony © 2023

Quick Links
Contact Info

20-22 Wenlock Road
London
N1 7GU
United Kingdom

Support

0800 011 4386