Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I am new to spark and Spark SQL and I was trying to query some data using spark SQL.

I need to fetch the month from a date that is given as a string.

I think it is not possible to query month directly from Spark SQL so I was thinking of writing a user-defined function in scala.

Is it possible to write UDF in spark SQL and if possible can anybody suggest the best method of writing a UDF

1 Answer

0 votes
by (32.3k points)

You can do this, at least for filtering, if you're willing to use a language-integrated query.

For a data file dates.txt containing:

one,2014-06-01

two,2014-07-01

three,2014-08-01

four,2014-08-15

five,2014-09-15

You can pack as much Scala date magic in your UDF as you want but I'll keep it simple:

def myDateFilter(date: String) = date contains "-08-"

Set it all up as follows -- a lot of this is from the Programming guide.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._

// case class for your records

case class Entry(name: String, when: String)

// read and parse the data

val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1)))

You can use the UDF as part of your WHERE clause:

val augustEntries = entries.where('when)(myDateFilter).select('name, 'when)

and see the results:

augustEntries.map(r => r(0)).collect().foreach(println)

Notice the where method that I've used, declared as follows in the doc:

def where[T1](arg1: Symbol)(udf: (T1) ⇒ Boolean): SchemaRDD

So, only one argument can be taken by the UDF, but you can compose several .where() calls to filter on multiple columns.



 

For Spark 1.2.0 (and for 1.1.0): I don’t know if it is really documented or not, but Spark now supports registering a UDF so it can be queried from SQL.

The above UDF could be registered using:

sqlContext.registerFunction("myDateFilter", myDateFilter)

and if the table was registered

sqlContext.registerRDDAsTable(entries, "entries")

it could be queried using

sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)")


For more details check this.

31k questions

32.8k answers

501 comments

693 users

Browse Categories

...