Additional Key/Value Operations
We will continue with our theme on key/value pair RDDs from the previous post and look at some additional operations and considerations. Specifically, we will look at,
- Tuning parallelism
- Grouping non-pair RDDs, without transformation
- Comparing reduceByKey against groupByKey
- Pair RDD actions
Grouping Non-Pair RDDs
In the previous post we were transforming the sold property data into a key/value pair, keyed by the county name, to create a pair RDD. An alternative is to use the groupBy method as shown below. The example finds the total number of properties sold in each county.
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | groupBy(_(13)). | mapValues(_.foldLeft(0.0)(_ + _(1).replace("\"", "").toDouble)). | take(5). | foreach(println) ("MAIDSTONE",1.17806E7) ("SOUTHWARK",6.4932281E8) ("CEREDIGION",1.99009249E8) ("ROTHERHAM",5436495.0) ("CANTERBURY",2.0384842E7)
The example above groups by the county name and maps the values of the resultant RDD to accumulate the second field in the CSV, which is the amount using a foldLeft.
By default Spark decides the number of partitions for an RDD to perform distributed transformations and aggregations, based on the number of cores and nodes available etc. However, most of the transformation and aggregation operations also allow us to specify the number of partitions by specifying an overloaded parameter. Please refer to the API documentation for detail.
Additionally, one can also repartition an RDD by calling the repartition or coalesce operations, latter being more efficient if we want to decrease the number of partitions, by reducing data movement.
Comparing reduceByKey against groupByKey
Say we have an RDD of words, which we want to count. One way to do this is to create a pair RDD of each word with the number 1 and then do a reduceByKey as follows,
scala> sc.parallelize(List("a", "b", "c", "a")). | map((_, 1)). | reduceByKey(_ + _). | collect res80: Array[(String, Int)] = Array((a,2), (b,1), (c,1))
Alternatively, we can do a groupByKey and then reduce the values of each key as below,
scala> sc.parallelize(List("a", "b", "c", "a")). | map((_, 1)). | groupByKey. | map(t => (t._1, t._2.sum)). | collect res81: Array[(String, Int)] = Array((a,2), (b,1), (c,1))
Both will produce the same result. However, with large datasets, the former approach is more efficient as there is less data shuffle. This is because reduceByKey will reduce the values for each key locally, before bring the summarised results from each node together and then doing an overall reduction. Whereas groupByKey first collocates all the keys together by shuffling the data before the reduction logic in the map is executed.
Pair RDD Actions
Pair RDDs have a number of useful actions, for counting elements by keys, looking up elements against a key etc. The snippet below shows the total house price sold in Greater London.
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | groupBy(_(13).replace("\"", "")). | mapValues(_.foldLeft(0.0)(_ + _(1).replace("\"", "").toDouble)). | lookup("GREATER LONDON") res87: Seq[Double] = ArrayBuffer(6.6407243817E10)