In this post we will look at Spark transformations. We will continue to use the UK sold house price data from previous posts for the examples. This may be downloaded from here. We have seen in the previous post an RDD is a resilient distributed dataset that can be processed in parallel across multiple Spark nodes. Operations that are performed against RDDs are classified into two categories,
- Transformations: Transformations acts against the contents of an RDD and generate a new RDD.
- Actions: Actions operate against the contents of an RDD and bring summarized results to the driver
In this post we will specifically look at RDD transformations. Following are some of the key characteristics of transformations.
- Transformations are lazy. They are only executed, when an action is performed. Spark maintains a DAG (Directed Acyclic Graph) of transformations, starting from the source RDD, which are executed when an action is invoked.
- Transformations can be narrow, where transformation can be applied locally within each partition, with no need to move data between partitions.
- However, certain transformations are wide, where data may need to be moved between partitions. This specifically applies to transformation logic that requires grouping by keys.
- Every time you execute an actions the whole DAG is recomputed. However, you have the option to persist the transformed results any arbitrary point, which will allow Spark to cache that data and only execute the DAG from the point of persistence. This is useful to improve performance after expensive data cleanup etc.
Now we will look at some of the transformation operations in detail,
Mapping the Contents of an RDD
In our house price data, each line corresponds to CSV that contains the detail of the sold price including the address of the property, date the house was sold etc. Let us say we want to parse the data and get a tuple of postcode of the property and the sold price. You can do that using the map transformation as listed below,
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | map(r => (r(3), r(1))). | first res11: (String, String) = ("HA9 8QF","370000")
The snippet above loads the house price data into an RDD, transforms the RDD into a new RDD by splitting the lines in the original RDD to an array of strings and transforms the second RDD to a new RDD by extracting the fourth and second elements of each element into a tuple.
Similar to Scala collections, the RDD API also has a flat map option. Let us say, we want to get a list of second address line, which is the eighth field in the CSV, which is optional. We can do a map as above and return an Option. That will give us an RDD of options as shown below,
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | map(r => if (r(8).length == 2) None else Some(r(8))). | take(3) res17: Array[Option[String]] = Array(None, None, Some("17"))
However, with a flatMap, Spark will flatten the result and return only those records that have a secondary address.
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | flatMap(r => if (r(8).length == 2) None else Some(r(8))). | take(3) res18: Array[String] = Array("17", "APARTMENT 37", "52")
Filtering the Contents of an RDD
Let us say, we want to see only the house price records in the postcode HA9 8QF. You can do this with the filter transformation on the RDD as shown below,
scala> sc.textFile("pp-2014.csv"). | map(_.split(",")). | filter(_(3).contains("HA9 8QF")). | count res20: Long = 1
Set Based Operations
The RDD API provides a number of set based operations. Some examples are shown below,
The snipped below gives you the distinct elements,
scala> sc.parallelize(List(1, 1, 2, 3, 3, 4)).distinct.collect res22: Array[Int] = Array(4, 1, 2, 3)
The snippet below shows how to create a union of the contents of two RDDs
scala> sc.parallelize(List(1, 2)).union(sc.parallelize(List(3, 4))).collect res24: Array[Int] = Array(1, 2, 3, 4)
The snippet below shows how to get the intersection of two RDDs
scala> sc.parallelize(List(1, 2)).intersection(sc.parallelize(List(2, 3))).collect res26: Array[Int] = Array(2)
The snippet below shows how to subtract the contents of one RDD from another,
scala> sc.parallelize(List(1, 2)).subtract(sc.parallelize(List(2))).collect res27: Array[Int] = Array(1)
In the next step we will look at performing actions against RDDs.
Leave a Reply