Back

Explore Courses Blog Tutorials Interview Questions
+4 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

Let's say I have a rather large dataset in the following form:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])


What I would like to do is remove duplicate rows based on the values of the first,third and fourth columns only.

Removing entirely duplicate rows is straightforward:

data = data.distinct()
and either row 5 or row 6 will be removed

But how do I only remove duplicate rows based on columns 1, 3 and 4 only? i.e. remove either one one of these:

('Baz',22,'US',6)
('Baz',36,'US',6)


In Python, this could be done by specifying columns with .drop_duplicates(). How can I achieve the same in Spark/Pyspark?

5 Answers

+3 votes
by (32.3k points)

Pyspark does include a dropDuplicates() method. Follow the way given below and use the same approach in your problem:

>>> from pyspark.sql import Row

>>> df = sc.parallelize([ \

...     Row(name='Amit', id=5, marks=80), \

...     Row(name='Amit', id=5, marks=80), \

...     Row(name='Amit', id=10, marks=80)]).toDF()

>>> df.dropDuplicates().show()

+---+------+-----+

|id |marks| name|

+---+------+-----+

|  5|   80|Amit|

| 10|    80|Alice|

+---+------+-----+

>>> df.dropDuplicates(['name', 'marks']).show()

+---+------+-----+

|id | marks| name|

+---+------+-----+

|  5|   80| Amit|

+---+------+-----+

If you wish to learn Spark visit this Spark Tutorial.

by (29.3k points)
Nice explanation thanks.
by (19.7k points)
It helped! Thanks!
by
Is there any way to capture the records that it dropped?
by (41.4k points)
Also, drop_duplicates(self, subset=None, keep='first', inplace=False) returns DataFrame with duplicate rows removed, optionally only considering certain columns and Indexes that includes time indexes are ignored.
by (33.1k points)
Your answer was helpful.
Thanks!
by (44.4k points)
Following this Python syntax solved my issue
+2 votes
by (108k points)

From your question, it is unclear as to which columns you want to use to discover the duplicates. The idea behind the solution is to create a key based on the values of the columns that identify duplicates. Then, you can use reduceByKey or reduce operations to eliminate duplicates.

Here is some code to get you started:

def get_key(x):

    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

Now, you have a key-value RDD that is keyed by columns 1,3 and 4. The next step would be either a reduceByKey or groupByKey and filter. This would eliminate duplicates.

r = m.reduceByKey(lambda x,y: (x))

by (47.2k points)
I agree with Vinita. In order to add on, it may not be the case that we want to groupBy all columns other than the column(s) in aggregate function i.e if we want to remove duplicates purely based on a subset of columns and retain all columns in the original data frame. So the better way to do this could be using dropDuplicates Dataframe API available in Spark 1.4.0
by (32.1k points)
Nice explanation! Works for me!
0 votes
by (29.5k points)

 if you want to do this as a DataFrame, just use groupBy and agg. Assuming you had a DF already created (with columns named "col1", "col2", etc) you could do:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

0 votes
by (106k points)

For removing duplicates from rows based on specific columns in an RDD/Spark DataFrame you can use inbuilt function dropDuplicates()

See the code below:-

val data = sc.parallelize(List(("Foo",41,"US",3), ("Foo",39,"UK",1),

("Bar",57,"CA",2),

("Bar",72,"CA",2),

("Baz",22,"US",6),

("Baz",36,"US",6))).toDF("x","y","z","count")

 data.dropDuplicates(Array("x","count")).show()

Output:

+---+---+---+-----+ 

| x| y| z|count   | 

+---+---+---+-----+ 

|Baz| 22| US|    6| 

|Foo| 39| UK|    1| 

|Foo| 41| US|    3| 

|Bar| 57| CA|    2| 

+---+---+---+-----+

0 votes
by (19.9k points)

x = usersDf.drop_duplicates(subset=['DETUserId']) - X dataframe will be all the dropped records.

Browse Categories

...