Back

Explore Courses Blog Tutorials Interview Questions
0 votes
1 view
in Big Data Hadoop & Spark by (11.4k points)

I am new to spark and Spark SQL and I was trying to query some data using spark SQL.

I need to fetch the month from a date that is given as a string.

I think it is not possible to query month directly from Spark SQL so I was thinking of writing a user-defined function in scala.

Is it possible to write UDF in spark SQL and if possible can anybody suggest the best method of writing a UDF

1 Answer

0 votes
by (32.3k points)

You can do this, at least for filtering, if you're willing to use a language-integrated query.

For a data file dates.txt containing:

one,2014-06-01

two,2014-07-01

three,2014-08-01

four,2014-08-15

five,2014-09-15

You can pack as much Scala date magic in your UDF as you want but I'll keep it simple:

def myDateFilter(date: String) = date contains "-08-"

Set it all up as follows -- a lot of this is from the Programming guide.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._

// case class for your records

case class Entry(name: String, when: String)

// read and parse the data

val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1)))

You can use the UDF as part of your WHERE clause:

val augustEntries = entries.where('when)(myDateFilter).select('name, 'when)

and see the results:

augustEntries.map(r => r(0)).collect().foreach(println)

Notice the where method that I've used, declared as follows in the doc:

def where[T1](arg1: Symbol)(udf: (T1) ⇒ Boolean): SchemaRDD

So, only one argument can be taken by the UDF, but you can compose several .where() calls to filter on multiple columns.



 

For Spark 1.2.0 (and for 1.1.0): I don’t know if it is really documented or not, but Spark now supports registering a UDF so it can be queried from SQL.

The above UDF could be registered using:

sqlContext.registerFunction("myDateFilter", myDateFilter)

and if the table was registered

sqlContext.registerRDDAsTable(entries, "entries")

it could be queried using

sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)")


For more details check this.

Welcome to Intellipaat Community. Get your technical queries answered by top developers!

28.4k questions

29.7k answers

500 comments

94.1k users

Browse Categories

...