lets make your dataset marginally more interesting:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
We have six elements:
rdd.count
Long = 6
no partitioner:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
and eight partitions:
rdd.partitions.length
Int = 8
Now lets define small helper to count number of elements per partition:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Since we don't have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
****inital-distribution
Now lets repartition our dataset:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Since parameter passed to HashPartitioner defines number of partitions we have expect one partition:
rddOneP.partitions.length
Int = 1
Since we have only one partition it contains all elements:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
****hash-partitioner-1
Note that the order of values after the shuffle is non-deterministic.
Same way if we use HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
we'll get 2 partitions:
rddTwoP.partitions.length
Int = 2
Since rdd is partitioned by key data won't be distributed uniformly anymore:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Because with have three keys and only two different values of hashCode mod numPartitions there is nothing unexpected here:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Just to confirm the above:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
****hash-partitioner-2
Finally with HashPartitioner(7) we get seven partitions, three non-empty with 2 elements each:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
If you want to know more about Spark, then do check out this awesome video tutorial: