Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

How can I find median of an RDD of integers using a distributed method, IPython, and Spark? The RDD is approximately 700,000 elements and therefore too large to collect and find the median.

1 Answer

0 votes
by (32.3k points)

Spark 2.0+:

Use approxQuantile method which implements Greenwald-Khanna algorithm:

Python:

df.approxQuantile("x", [0.5], 0.25)

Scala:

df.stat.approxQuantile("x", Array(0.5), 0.25)

where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.


 

For Spark 2.2+: Since  (SPARK-14352)It supports estimation on multiple columns:

df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

and

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)


 

Here is another method I used using window functions (with pyspark 2.2.0).

from pyspark.sql import DataFrame

class median():

    // Create median class with over method to pass partition //

    def __init__(self, df, col, name):

        assert col

        self.column=col

        self.df = df

        self.name = name

    def over(self, window):

        from pyspark.sql.functions import percent_rank, pow, first

        first_window = window.orderBy(self.column)                                  # first, order by column we want to compute the median for

        df = self.df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 corresponds to median

        second_window = window.orderBy(pow(df.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending

        return df.withColumn(self.name, first(self.column).over(second_window))     # the first row of the window corresponds to median

def addMedian(self, col, median_name):

    // Method to be added to spark native DataFrame class //

    return median(self, col, median_name)

# Add method to DataFrame cl

ass

Dat

aFr

ame.addMedian = addMedian

Then finally to calculate the median of col2 call the addMedian meth

o

d:

f

rom pyspark.sql import Window

median_window = Window.partitionBy("col1")

df = df.addMedian("col2", "median").over(median_wind

ow)

Finally

you can group by if needed.

df.groupby("col1", "median")

Learn Spark with this Spark Certification Course by Intellipaat.

31k questions

32.8k answers

501 comments

693 users

Browse Categories

...