Back

Explore Courses Blog Tutorials Interview Questions
0 votes
4 views
in Big Data Hadoop & Spark by (11.4k points)

I’m using Scala and want to build my own DataFrame function. For example, I want to treat a column like an array , iterate through each element and make a calculation.

To start off, I’m trying to implement my own getMax method. So column x would have the values [3,8,2,5,9], and the expected output of the method would be 9.

Here is what it looks like in Scala

def getMax(inputArray: Array[Int]): Int = {
   var maxValue = inputArray(0)
   for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
     maxValue = inputArray(i)
   }
   maxValue
}


This is what I have so far, and get this error

"value length is not a member of org.apache.spark.sql.column",
 

and I don't know how else to iterate through the column.

def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
    maxValue = col(i)
}
maxValue
}

Once I am able to implement my own method, I will create a column function

val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”)


And then I hope to be able to use this in a SQL statement, for example

val sample = sqlContext.sql("SELECT value_max(x) FROM table")


and the expected output would be 9, given input column [3,8,2,5,9]

1 Answer

0 votes
by (32.3k points)

In a Spark DataFrame, you can't iterate through the elements of a Column using the approaches you thought of because a Column is not an iterable object.

However, to process the values of a column, you have some options and the right one depends on your task:

1) Using the existing built-in functions

Spark SQL already has plenty of useful functions for processing columns, including aggregation and transformation functions. Most of them you can find in the functions package (documentation here). Some others (binary functions in general) you can find directly in the Column object (documentation here). So, if you can use them, it's usually the best option. Note: don't forget the Window Functions.

2) Creating an UDF

If you can't complete your task with the built-in functions, you may consider defining an UDF (User Defined Function). They are useful when you can process each item of a column independently and you expect to produce a new column with the same number of rows as the original one (not an aggregated column). This approach is quite simple: first, you define a simple function, then you register it as an UDF, then you use it. Example:

def myFunc: (String => String) = { s => s.toLowerCase }

import org.apache.spark.sql.functions.udf

val myUDF = udf(myFunc)

val newDF = df.withColumn("newCol", myUDF(df("oldCol")))


 

3) Using an UDAF

If your task is to create aggregated data, you can define an UDAF (User Defined Aggregation Function). I don't have a lot of experience with this, but I can point you to a nice tutorial. Just check this out.

4) Fall back to RDD processing

If you really can't use the options above, or if you are processing task depends on different rows for processing one and it's not an aggregation, then I think you would have to select the column you want and process it using the corresponding RDD. Example:

val singleColumnDF = df("column")

val myRDD = singleColumnDF.rdd

// process myRDD

Browse Categories

...