Service Symphony

Additional Key/Value Operations

We will continue with our theme on key/value pair RDDs from the previous post and look at some additional operations and considerations. Specifically, we will look at,

  1. Tuning parallelism
  2. Grouping non-pair RDDs, without transformation
  3. Comparing reduceByKey against groupByKey
  4. Pair RDD actions

Grouping Non-Pair RDDs

In the previous post we were transforming the sold property data into a key/value pair, keyed by the county name, to create a pair RDD. An alternative is to use the groupBy method as shown below. The example finds the total number of properties sold in each county.

scala> sc.textFile("pp-2014.csv").
     | map(_.split(",")).
     | groupBy(_(13)).
     | mapValues(_.foldLeft(0.0)(_ + _(1).replace("\"", "").toDouble)).
     | take(5).
     | foreach(println)
("MAIDSTONE",1.17806E7)                                                         
("SOUTHWARK",6.4932281E8)
("CEREDIGION",1.99009249E8)
("ROTHERHAM",5436495.0)
("CANTERBURY",2.0384842E7)

The example above groups by the county name and maps the values of the resultant RDD to accumulate the second field in the CSV, which is the amount using a foldLeft.

Tuning Parallelism

By default Spark decides the number of partitions for an RDD to perform distributed transformations and aggregations, based on the number of cores and nodes available etc. However, most of the transformation and aggregation operations also allow us to specify the number of partitions by specifying an overloaded parameter. Please refer to the API documentation for detail.

Additionally, one can also repartition an RDD by calling the repartition or coalesce operations, latter being more efficient if we want to decrease the number of partitions, by reducing data movement.

Comparing reduceByKey against groupByKey

Say we have an RDD of words, which we want to count. One way to do this is to create a pair RDD of each word with the number 1 and then do a reduceByKey as follows,

scala> sc.parallelize(List("a", "b", "c", "a")).
     | map((_, 1)).
     | reduceByKey(_ + _).
     | collect
res80: Array[(String, Int)] = Array((a,2), (b,1), (c,1))

Alternatively, we can do a groupByKey and then reduce the values of each key as below,

scala> sc.parallelize(List("a", "b", "c", "a")).
     | map((_, 1)).
     | groupByKey.
     | map(t => (t._1, t._2.sum)).
     | collect
res81: Array[(String, Int)] = Array((a,2), (b,1), (c,1))

Both will produce the same result. However, with large datasets, the former approach is more efficient as there is less data shuffle. This is because reduceByKey will reduce the values for each key locally, before bring the summarised results from each node together and then doing an overall reduction. Whereas groupByKey first collocates all the keys together by shuffling the data before the reduction logic in the map is executed.

Pair RDD Actions

Pair RDDs have a number of useful actions, for counting elements by keys, looking up elements against a key etc. The snippet below shows the total house price sold in Greater London.

scala> sc.textFile("pp-2014.csv").
     | map(_.split(",")).
     | groupBy(_(13).replace("\"", "")).
     | mapValues(_.foldLeft(0.0)(_ + _(1).replace("\"", "").toDouble)).
     | lookup("GREATER LONDON")
res87: Seq[Double] = ArrayBuffer(6.6407243817E10)

 

Share this:

One response to “Additional Key/Value Operations”

  1. Bobbie says:

    What a plausere to find someone who thinks through the issues

Leave a Reply

Your email address will not be published. Required fields are marked *

2 × 2 =

Some of our
Clients