Working with Key/Value Pairs
As mentioned in an earlier post, depending on the type of the contents of the RDD, the API offers additional operations. RDD instances that contain key-value pairs offer distributed operations that specifically work against the keys for the purpose of grouping and aggregating data. In the Scala API, RDD[Tuple2] offers these operations using implicit conversions, using the PairRDDFunctions class.
Note: We will continue to use the sold property prices data from the previous posts for the purpose of examples in this post.
Note: The type you use as the key in a pair RDD should implement the equals and hashCode method appropriately.
Creating Key-Value Data
Pair RDDs can be automatically created by loading data in certain formats. Alternatively, we can transform a normal RDD to a pair RDD as shown below.
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | map(r => (r(13).replace("\"", ""), r(1).replace("\"", "").toDouble)). | first res17: (String, Double) = (GREATER LONDON,370000.0)
The snippet above, reads the raw files and creates a pair RDD, with the thirteenth field (county) as the key and the second field (sold pice) as the value.
Grouping Values by Key
The pair RDD allows grouping all the values belonging to the same key as shown below,
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | map(r => (r(3).replace("\"", ""), r(1).replace("\"", "").toDouble)). | groupByKey(). | first res18:(String, Iterable[Double]) = (CAMBRIDGE,CompactBuffer(142500.0, ..
Reducing by Key
Now let us say we want to find the total price of houses sold in each county. This can be done by reducing the RDD by key as shown below,
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | map(r => (r(13).replace("\"", ""), r(1).replace("\"", "").toDouble)). | reduceByKey((a, b) => a + b). | take(5). | foreach(println) (CAMBRIDGE,8.409059E7) (STAFFORDSHIRE MOORLANDS,1251250.0) (CITY OF DERBY,7.08568705E8) (SUFFOLK,3.232794909E9) (TEST VALLEY,8894995.0)
Combining Pair RDDs
The API also provides a number of operations to work with two pair RDDs. The snippet below performs an inner join between two RDDs, using the keys,
scala> sc.parallelize(List((1, 2), (2, 1))). | join(sc.parallelize(List((1, 3), (2, 4)))). | collect res34: Array[(Int, (Int, Int))] = Array((1,(2,3)), (2,(1,4)))
There are also similar set-based operations for performing left outer join, right outer join and subtraction.
Average House Price per County
If we want to find the average price of houses in each county, we could do the following,
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | map(r => (r(13).replace("\"", ""), r(1).replace("\"", "").toDouble)). | mapValues((_, 1)). | reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)). | sortByKey(). | mapValues(x => x._1 / x._2). | take(3) res43: Array[(String, Double)] = Array(("",44950.0), (ADUR,195978.8596491228), (ALLERDALE,239300.0))
The snippet above performs the following,
- Read the raw data
- Split each line into a list of CSV
- Convert into a pair RDD of county and sold price
- Use the mapValues function to transform each value (the sold price) into a tuple of the value itself and the number 1
- Run reduceByKey to aggregate the total amount and total count
- Sorts the data by key
- Map the values by dividing total price by total count to compute the average
- Take the first three elements
The keys are reduced locally, before they are combined globally across the nodes. The reason we had to do that is similar to the reduce function from normal RDD, that the input and output types need to be the same, however, for computing the average we need to keep track of the count.
Similar to the aggregate operation on plain RDDs, pair RDDs have a combineByKey method that will allow you to have different input and output types for the aggregation. An example is shown below,
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | map(r => (r(13).replace("\"", ""), r(1).replace("\"", "").toDouble)). | combineByKey(v => (v, 1), | (acc : (Double, Int), v) => (acc._1 + v, acc._2 + 1), | (acc1 : (Double, Int), acc2 : (Double, Int)) => | (acc1._1 + acc2._1, acc1._2 + acc2._2)). | sortByKey(). | mapValues(x => x._1 / x._2). | take(3) res44: Array[(String, Double)] = Array(("",44950.0), (ADUR,195978.8596491228), (ALLERDALE,239300.0))
Here combineByKey replaces mapValues and reduceByKey by accepting three functions as follows,
- The combiner creator function that converts from the RDD type V into combiner type C. In our case V is Double and C is (Double, Int)
- The merge value function, that merges the value in the source RDD into the combiner type. Here the the type V (Double) is merged into C (Double, Int), by incrementing the count and accumulating the sold prices
- The combiner aggregator function that accumulates the combiner type to generate the final result
What Next
There are still plenty of topics to cover here including persistence/caching, tuning parallelism, accumulators, broadcast variables etc. In the next post we will look at further operations on pair RDDs.
26/04/2012 at 11:21 pmAn intreesting discussion is worth comment. I think that you should write more on this topic, it might not be a taboo subject but generally people are not enough to speak on such topics. To the next. Cheers Log in to Reply