0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

I'm writing filter function for complex JSON dataset with lot's of inner structures. Passing individual columns is too cumbersome.

So I declared the following UDF:

val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))

 

Intuitively I'm thinking it will work like this:

records.filter("myFilter(*)=true")


What is the actual syntax?

1 Answer

0 votes
by (32.5k points)

You have to use struct() function for constructing the row while making a call to the function, follow these steps.

Import Row,

import org.apache.spark.sql._

Define the UDF

def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} 

Register the UDF

sqlContext.udf.register("myFilterFunction", myFilterFunction _)

Create the dataFrame

val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")

Use the UDF

records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show

Now, in order to pass all the columns to UDF do:

records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show 

Result:

+------+------+

|  text| text2|

+------+------+

|sachin|sachin|

+------+------+

Related questions

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...