Since Spark 1.6, a function called monotonically_increasing_id() was added.
It creates a new column with unique 64-bit monotonic index for each row
But it isn't consequential, each partition is meant to start a new range, so before using each partition, we must calculate its offset.
In order to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())
val partitionOffsets = dfWithPartitionId
.groupBy("partition_id")
.agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
.orderBy("partition_id")
.select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
.collect()
.map(_.getLong(0))
.toArray
dfWithPartitionId
.withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))
.withColumn(indexName, col("partition_offset") + col("inc_id"))
.drop("partition_id", "partition_offset", "inc_id")
}
This solution doesn't repack the original rows. It also doesn't repartition the original huge dataframe, therefore it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores.
If you want to know more about Spark, then do check out this awesome video tutorial: