Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I have this python code that runs locally in a pandas dataframe:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

 

I would like to run this in PySpark, but having trouble dealing with pyspark.sql.group.GroupedData object.

I've tried the following:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A'))
 


which returns

KeyError: 'A'


I presume because 'A' is no longer a column and I can't find the equivalent for x.name.

And then

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A')))
 .toDF()


but get the following error:

AttributeError: 'GroupedData' object has no attribute 'map'


Any suggestions would be really appreciated!

1 Answer

0 votes
by (32.3k points)

UDAF functions works on a data that is grouped by a key, where they need to define how to merge multiple values in the group in a single partition, and then also define how to merge the results across partitions for key. Unfortunately, there is currently no way in Python to implement a UDAF, they can only be implemented in Scala.

But,there is a workaround for this in Python. Try to use collect_set to gather your grouped values. Then go ahead, and use a regular UDF to do what you want with them. The only limitation here is tha collect_set only works on primitive values, so you have to encode them down to a string.

from pyspark.sql.types import StringType

from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):

    for val in data_list:

        b, c = data.split(',')

        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \

  .groupBy('A').agg(collect_list('data').alias('data'))

  .withColumn('data', myUdf('data'))


 

PySpark added support for UDAF'S using Pandas. Also, some nice performance improvements have been seen when using the Panda's UDFs and UDAFs over straight python functions with RDDs. Under the hood it vectorizes the columns, where it batches the values from multiple rows together to optimize processing and compression.

Since Spark 2.3 you can use pandas_udf. GROUPED_MAP takes Callable[[pandas.DataFrame], pandas.DataFrame] or in other words a function which maps from Pandas DataFrame of the same shape as the input, to the output DataFrame.

For example if your data looks like this:

df = spark.createDataFrame(

    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],

    ("key", "value1", "value2")

)

and you want to compute average value of pairwise min between value1 value2, you have to define output schema:

from pyspark.sql.types import *

schema = StructType([

    StructField("key", StringType()),

    StructField("avg_min", DoubleType())

])

pandas_udf:

import pandas as pd

from pyspark.sql.functions import pandas_udf

from pyspark.sql.functions import PandasUDFType

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)

def g(df):

    result = pd.DataFrame(df.groupby(df.key).apply(

        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()

    ))

    result.reset_index(inplace=True, drop=False)

    return result

and apply it:

df.groupby("key").apply(g).show()

+---+-------+

|key|avg_min|

+---+-------+

|  b|   -1.5|

|  a|   -0.5|

+---+-------+


 

Since Spark 2.4.0 you are also provided with GROUPED_AGG variant, which takes Callable[[pandas.Series, ...], T], where T is a primitive scalar:

import numpy as np

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)

def f(x, y):

    return np.minimum(x, y).mean()

which can easily be used with standard group_by / agg construct:

df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()

+---+-------+

|key|avg_min|

+---+-------+

|  b|   -1.5|

|  a|   -0.5|

+---+-------+

Please note that neither GROUPED_MAP nor GROUPPED_AGG pandas_udf behave the same way as UserDefinedAggregateFunction or Aggregator, and it is closer to groupByKey or window functions with unbounded frame. Data is shuffled first, and only after that, UDF is applied.

For optimized execution, I would suggest you implement Scala UserDefinedAggregateFunction and add Python wrapper.

Note:

SPARK-24561 - For User-defined window functions with pandas udf (bounded window) is fixed.

If you wish to learn Pyspark visit this Pyspark Tutorial.

If you wish to learn more about Python, visit the Python tutorial and Python course by Intellipaat.

Related questions

...