A key-value pair is a set of two linked data objects: a key, which is the unique identifier of a particular data object, and a value, which can be the target data or the location index of that data. Key-value pairs are commonly used for viewing tables, hash tables, and configuration files.
Using an RDD in Spark
Motivation
Spark provides special types of operations on RDDs that contain key/value pairs (Paired RDDs). These operations are called paired RDDs operations. Paired RDDs are a useful building block in many programming languages, as they expose operations that allow us to act on each key operation in parallel or re-group data across the network.
Creating Paired RDDs
Paired RDDs can be created by running a map() function that returns key/value pairs. The procedure to build key/value RDDs differs by language. In Python, for making the functions on the keyed data work, we need to return an RDD composed of tuples.
Creating a paired RDD using the first word as the key in Python:
pairs = lines.map(lambda x: (x.split(" ")[0], x))
In Scala also, for having the functions on the keyed data to be available, we need to return tuples as shown in the previous example. An implicit conversion on RDDs of tuples exists to provide the additional key/value functions as per requirements.
Creating a paired RDD using the first word as the keyword in Scala:
val pairs = lines.map(x => (x.split(" ")(0), x))
Java doesn’t have a built-in function of tuples, so only Spark’s Java API has users create tuples using the scala.Tuple2 class. Java users can construct a new tuple by writing new Tuple2(elem1, elem2) and can then access its relevant elements with the _1() and _2() methods.
Java users also need to call special versions of Spark’s functions when they are creating paired RDDs. For instance, the mapToPair () function should be used in place of the basic map() function.
Creating a paired RDD using the first word as the keyword in Java:
PairFunction<String, String, String> keyData =
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
Transformations on Paired RDDs
In this section, let’s check out various transformations that can be performed on paired RDDs.
Aggregations
When datasets are described in terms of key/value pairs, it is a common feature required to aggregate statistics across all elements with the same key/value. Spark has a set of operations that combines values that own the same key/value. These operations return RDDs and thus are transformations rather than actions. Below are the transformations:
- reduceByKey()
- foldByKey()
- combineByKey()
Another transformation we will discuss here is the grouping of data.
Get 100% Hike!
Master Most in Demand Skills Now!
Grouping of Data
With key data is a common type of use case in grouping our data sets is used for predefined key/value, for example, viewing all of a customer’s orders together in one file.
If our data is already keyed in the way we want to implement, groupByKey() will group our data using the key/value using our RDD. On an RDD consisting of keys of type K and values of type V, we get back an RDD operation of type [K, Iterable[V]].
This groupBy() transformation works on unpaired data or on data where we want to use different terms of conditions besides equality on the current key been specified. It requires a function that allows applying the same to every element in the source of RDD and uses the result to determine the key/value obtained.
Joins
The most useful and effective operations we get with keyed data values come from using it together with other keyed data. Joining datasets together is probably one of the most common types of operations we can perform on a paired RDD.
- innerJoin(): The only keys that are present in both paired RDDs are returned as the output.
- leftOuterJoin(): The resulting paired RDD would have entries for each key in the source RDD. The value which is been associated with each key in the result is a tuple of the value from the source RDD and an option for the value from the other paired RDD.
- rightOuterJoin(): is almost identical functioning to leftOuterJoin() except the key must be present in the other RDD and the tuple has an option for the source rather than the other RDD functions.
Sorting Data
We can sort an RDD with key or value pairs provided that there is an ordering defined on the key set. Once we have sorted our data elements, any subsequent call on the sorted data to collect() or save() will result in an ordered dataset.
Actions Available on Pair RDDs
- countByKey(): Counts the number of elements for each key pair
- collectAsMap(): Collects the result outputs as a map to provide easy lookup
- lookup(key): Returns all values associated with the provided key pair
Data Partitioning with Advanced Version
In a distributed program, communication is very expensive compared to others, so laying out data to minimize network traffic can greatly improve better performance. Much similar to how a single-node program structure needs to choose the right data structure for the collection of records, Spark programs can choose to control their RDDs partitioning to reduce the communication effects. Partitioning will not be helpful in all applications consider the following example, if a given RDD is scanned only one time, there is no point in partitioning the same in advance. It is useful only when a dataset is reused multiple times in key-oriented operations such as joins operation.
Determining an RDD’s Partitioner
Example:
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
scala> partitioned.partitioner
res1: Option[spark.Partitioner = Some(Spark.intellipaat@5147788d)
In this short session, we tried to create RDD operations of (Int, Int) pairs, which initially have no partitioning information consisting of an Option with the value None. We then created a second thing of RDD by hash-partitioning the first. If we wanted to make use of partitioned in further operations, then we should have appended persist() to the third line of input files, in which partition is defined conveniently. This is for the same reason that we required the persist () for userData in the previous example: without persist(), subsequent RDD actions will evaluate the entire lineage of partitioned information, which will cause pairs to be hash-partitioned over and over again.
Operations That Benefit from Partitioning
The operations that benefit from partitioning are as follows:
- cogroup()
- groupWith()
- join()
- leftOuterJoin()
- rightOuterJoin()
- groupByKey()
- reduceByKey()
- combineByKey()
- lookup()
Operations That Affect Partitioning
All the operations that result in a partitioned being set on the output result of RDD:
- cogroup()
- groupWith()
- join()
- leftOuterJoin()
- rightOuterJoin()
- groupByKey()
- reduceByKey()
- combineByKey()
- partitionBy()
- sort()
- mapValues() (if the parent RDD has a partitioner)
- flatMapValues() (if parent has a partitioner)
- filter() (if parent has a partitioner)
Custom Partitioners
To implement custom partitioner methods, you need to subclass the org.apache.spark.Partitioner class and implement three types of methods:
- NumPartitions: Int, which returns the number of partitions that you will create as per the requirements.
- getPartition(key: Any): Int, which returns the partition ID ranging from (0 to numPartitions-1) for a given key.
- equals(): the standard Java programming equality method. This is important to implement because of Spark application will need to test your Partitioner object against other instances by its terms when it decides whether two of your RDDs are partitioned the same way as it is required.
Our Big Data Courses Duration and Fees
Cohort starts on 11th Jan 2025
₹22,743
Cohort starts on 1st Feb 2025
₹22,743
Cohort starts on 25th Jan 2025
₹22,743