Back

Explore Courses Blog Tutorials Interview Questions
0 votes
1 view
in Big Data Hadoop & Spark by (11.4k points)

How can I find median of an RDD of integers using a distributed method, IPython, and Spark? The RDD is approximately 700,000 elements and therefore too large to collect and find the median.

1 Answer

0 votes
by (32.3k points)

Spark 2.0+:

Use approxQuantile method which implements Greenwald-Khanna algorithm:

Python:

df.approxQuantile("x", [0.5], 0.25)

Scala:

df.stat.approxQuantile("x", Array(0.5), 0.25)

where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.


 

For Spark 2.2+: Since  (SPARK-14352)It supports estimation on multiple columns:

df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

and

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)


 

Here is another method I used using window functions (with pyspark 2.2.0).

from pyspark.sql import DataFrame

class median():

    // Create median class with over method to pass partition //

    def __init__(self, df, col, name):

        assert col

        self.column=col

        self.df = df

        self.name = name

    def over(self, window):

        from pyspark.sql.functions import percent_rank, pow, first

        first_window = window.orderBy(self.column)                                  # first, order by column we want to compute the median for

        df = self.df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 corresponds to median

        second_window = window.orderBy(pow(df.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending

        return df.withColumn(self.name, first(self.column).over(second_window))     # the first row of the window corresponds to median

def addMedian(self, col, median_name):

    // Method to be added to spark native DataFrame class //

    return median(self, col, median_name)

# Add method to DataFrame cl

ass

Dat

aFr

ame.addMedian = addMedian

Then finally to calculate the median of col2 call the addMedian meth

o

d:

f

rom pyspark.sql import Window

median_window = Window.partitionBy("col1")

df = df.addMedian("col2", "median").over(median_wind

ow)

Finally

you can group by if needed.

df.groupby("col1", "median")

Learn Spark with this Spark Certification Course by Intellipaat.

Browse Categories

...