0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

I read up on the documentation of HashPartitioner. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitioner partitions the distributed set based on the hash of the keys. For example if my data is like

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)


So partitioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument

new HashPartitoner(numPartitions) //What does numPartitions do?


For the above dataset how would the results differ if I did

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)


So how does HashPartitioner work actually?

1 Answer

0 votes
by (32.5k points)
edited by

lets make your dataset marginally more interesting:

val rdd = sc.parallelize(for {

    x <- 1 to 3

    y <- 1 to 2

} yield (x, None), 8)

We have six elements:

rdd.count

Long = 6

no partitioner:

rdd.partitioner

Option[org.apache.spark.Partitioner] = None

and eight partitions:

rdd.partitions.length

Int = 8

Now lets define small helper to count number of elements per partition:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {

    rdd.mapPartitions(iter => Iterator(iter.length))

}

Since we don't have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):

countByPartition(rdd).collect()

Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

****inital-distribution

Now lets repartition our dataset:

import org.apache.spark.HashPartitioner

val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Since parameter passed to HashPartitioner defines number of partitions we have expect one partition:

rddOneP.partitions.length

Int = 1

Since we have only one partition it contains all elements:

countByPartition(rddOneP).collect

Array[Int] = Array(6)

****hash-partitioner-1

Note that the order of values after the shuffle is non-deterministic.

Same way if we use HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

we'll get 2 partitions:

rddTwoP.partitions.length

Int = 2

Since rdd is partitioned by key data won't be distributed uniformly anymore:

countByPartition(rddTwoP).collect()

Array[Int] = Array(2, 4)

Because with have three keys and only two different values of hashCode mod numPartitions there is nothing unexpected here:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))

scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Just to confirm the above:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()

Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

****hash-partitioner-2

Finally with HashPartitioner(7) we get seven partitions, three non-empty with 2 elements each:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))

rddSevenP.partitions.length

Int = 7

countByPartition(rddTenP).collect()

Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

If you want to know more about Spark, then do check out this awesome video tutorial:

Related questions

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...