Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I have some data in the following format (either RDD or Spark DataFrame):

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

 rdd = sc.parallelize([('X01',41,'US',3),
                       ('X01',41,'UK',1),
                       ('X01',41,'CA',2),
                       ('X02',72,'US',4),
                       ('X02',72,'UK',6),
                       ('X02',72,'CA',7),
                       ('X02',72,'XX',8)])

# convert to a Spark DataFrame                   
schema = StructType([StructField('ID', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Country', StringType(), True),
                     StructField('Score', IntegerType(), True)])

df = sqlContext.createDataFrame(rdd, schema)


What I would like to do is to 'reshape' the data, convert certain rows in Country(specifically US, UK and CA) into columns:

ID    Age  US  UK  CA 
'X01'  41  3   1   2 
'X02'  72  4   6   7 

1 Answer

0 votes
by (32.3k points)

Since Spark 1.6 you can use pivot function on GroupedData and provide aggregate expression.

pivoted = (df

    .groupBy("ID", "Age")

    .pivot(

        "Country",

        ['US', 'UK', 'CA'])  # Optional list of levels

    .sum("Score"))  # alternatively you can use .agg(expr))

pivoted.show()

## +---+---+---+---+---+

## | ID|Age| US| UK| CA|

## +---+---+---+---+---+

## |X01| 41|  3| 1| 2|

## |X02| 72|  4| 6| 7|

## +---+---+---+---+---+

Levels can be omitted but if provided can boost performance and serve as an internal filter.

This method is still relatively slow but certainly beats passing data manually between JVM and Python.

...