Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I am trying to solve the age-old problem of adding a sequence number to a data set. I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex. On the other hand, the following works more or less the way I want it to:

val origDF = sqlContext.load(...)   

val seqDF= sqlContext.createDataFrame(
    origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
    StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)


In my actual application, origDF won't be loaded directly out of a file -- it is going to be created by joining 2-3 other DataFrames together and will contain upwards of 100 million rows.

Is there a better way to do this? What can I do to optimize it?

1 Answer

0 votes
by (32.3k points)
edited by

Since Spark 1.6, a function called monotonically_increasing_id() was added.

It creates a new column with unique 64-bit monotonic index for each row

But it isn't consequential, each partition is meant to start a new range, so before using each partition, we must calculate its offset.

In order to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {

    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())

    val partitionOffsets = dfWithPartitionId

        .groupBy("partition_id")

        .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")

        .orderBy("partition_id")

        .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )

        .collect()

        .map(_.getLong(0))

        .toArray

     dfWithPartitionId

        .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))

        .withColumn(indexName, col("partition_offset") + col("inc_id"))

        .drop("partition_id", "partition_offset", "inc_id")

}

This solution doesn't repack the original rows. It also doesn't repartition the original huge dataframe, therefore it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores.

If you want to know more about Spark, then do check out this awesome video tutorial:

Browse Categories

...