Flat 10% & upto 50% off + Free additional Courses. Hurry up!

Working with Key/Value Pairs



Spark provides special operations on RDDs containing key/value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network.


Creating Pair RDDs

Pair RDDs can be created by running a map() function that returns key/value pairs.

The way to build key-value RDDs differs by language. In Python Online Training, for the functions on keyed data to work we need to return an RDD composed of tuples (see Example 4-1).


Example 4-1. Creating a pair RDD using the first word as the key in Python

pairs = x: (x.split(" ")[0], x))


In Scala, for the functions on keyed data to be available, we also need to return tuples (see Example 4-2). An implicit conversion on RDDs of tuples exists to provide the additional key/value functions.


Example 4-2. Creating a pair RDD using the first word as the key in Scala

val pairs = => (x.split(" ")(0), x))


Java doesn’t have a built-in tuple type, so Spark’s Java API has users create tuples using the scala.Tuple2 class. This class is very simple: Java users can construct a new tuple by writing new Tuple2(elem1, elem2) and can then access its elements with the ._1() and ._2() methods.

Java users also need to call special versions of Spark’s functions when creating pair RDDs. For instance, the mapToPair() function should be used in place of the basic map() function.


Example 4-3. Creating a pair RDD using the first word as the key in Java

PairFunction<String, String, String> keyData =

new PairFunction<String, String, String>() {

public Tuple2<String, String> call(String x) {

return new Tuple2(x.split(" ")[0], x);



JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);


Transformations on Pair RDDs


When datasets are described in terms of key/value pairs, it is common to want to aggregate statistics across all elements with the same key. Spark has a set of operations that combines values that have the same key. These operations return RDDs and thus are transformations rather than actions i.e. reduceByKey(), foldByKey(), combineByKey().


Grouping Data

With keyed data a common use case is grouping our data by key—for example, viewing all of a customer’s orders together.

If our data is already keyed in the way we want, groupByKey() will group our data using the key in our RDD. On an RDD consisting of keys of type K and values of type V, we get back an RDD of type [K, Iterable[V]].

groupBy() works on unpaired data or data where we want to use a different condition besides equality on the current key. It takes a function that it applies to every element in the source RDD and uses the result to determine the key.



The most useful operations we get with keyed data comes from using it together with other keyed data. Joining data together is probably one of the most common operations on a pair RDD.


  • Inner Join: Only keys that are present in both pair RDDs are output.
  • leftOuterJoin(): The resulting pair RDD has entries for each key in the source RDD. The value associated with each key in the result is a tuple of the value from the source RDD and an Option (or Optional in Java) for the value from the other pair RDD.
  • rightOuterJoin() is almost identical to leftOuterJoin() except the key must bepresent in the other RDD and the tuple has an option for the source rather than theother RDD.


Sorting Data

We can sort an RDD with key/value pairs provided that there is an ordering defined on the key. Once we have sorted our data, any subsequent call on the sorted data to collect() or save() will result in ordered data.


Actions Available on Pair RDDs

  • countByKey() Count the number of elements for each key.
  • collectAsMap() Collect the result as a map to provide easy lookup.
  • lookup(key) Return all values associated with the provided key.


Data Partitioning (Advanced)

In a distributed program, communication is very expensive, so laying out data to minimize network traffic can greatly improve performance. Much like how a single-node program needs to choose the right data structure for a collection of records, Spark programs can choose to control their RDDs partitioning to reduce communication. Partitioning will not be helpful in all applications—for example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It is useful only when a dataset is reused multiple times in key-oriented operations such as joins.


Determining an RDD’s Partitioner:


scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))

pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> pairs.partitioner

res0: Option[spark.Partitioner] = None

scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))

partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14

scala> partitioned.partitioner

res1: Option[spark.Partitioner] = Some(Spark.intellipaat@5147788d)


In this short session, we created an RDD of (Int, Int) pairs, which initially have no partitioning information (an Option with value None). We then created a second RDD by hash-partitioning the first. If we actually wanted to use partitioned in further operations, then we should have appended persist() to the third line of input, in which partitioned is defined. This is for the same reason that we needed per sist() for userData in the previous example: without persist(), subsequent RDD actions will evaluate the entire lineage of partitioned, which will cause pairs to be hash-partitioned over and over.


Operations That Benefit from Partitioning

The operations that benefit from partitioning are cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), and lookup().


Operations That Affect Partitioning

All the operations that result in a partitioner being set on the output RDD: cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues() (if the parent RDD has a partitioner), flatMapValues() (if parent has a partitioner), and filter() (if parent has a partitioner).


Custom Partitioners

To implement a custom partitioner, you need to subclass the org.apache.spark.Partitioner class and implement three methods:

  • numPartitions: Int, which returns the number of partitions you will create.
  • getPartition(key: Any): Int, which returns the partition ID (0 to numPartitions-1) for a given key.
  • equals(), the standard Java equality method. This is important to implement because Spark will need to test your Partitioner object against other instances of itself when it decides whether two of your RDDs are partitioned the same way!

"0 Responses on Working with Key/Value Pairs"

Leave a Message

100% Secure Payments. All major credit & debit cards accepted Or Pay by Paypal.

Sales Offer

  • To avail this offer, enroll before 19th January 2017.
  • This offer cannot be combined with any other offer.
  • This offer is valid on selected courses only.
  • Please use coupon codes mentioned below to avail the offer

Sign Up or Login to view the Free Working with Key/Value Pairs.