Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I'm trying to figure out the new dataframe API in Spark. seems like a good step forward but having trouble doing something that should be pretty simple. I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a functiin something like this:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}


When I try to use it like this:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))


I get type mismatch errors

found   : org.apache.spark.sql.Column
required: Integer


I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting wrrors witht he function compiling because it wants a boolean in the if statement.

Am I doing this wrong? Is there a better/another way to do this than using withColumn?

2 Answers

0 votes
by (32.3k points)
edited by

Taking the "Amt" column in your Schema, try to do this:

import org.apache.spark.sql.functions._

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}

val sqlfunc = udf(coder)

myDF.withColumn("Code", sqlfunc(col("Amt")))

According to me, withColumn is the right way to add a column.

If you want to know more about Spark, then do check out this awesome video tutorial:

0 votes
by (33.1k points)

For PySpark:

df = sqlContext.createDataFrame([(1, "a", 25.0), (2, "B", -25.0)], ("c1", "c2", "c3"))

from pyspark.sql.functions import udf

from pyspark.sql.types import *

def valueToCategory(value):

   if   value == 1: return 1

   elif value == 2: return 2

   else: return 0

# NOTE: it seems that calls to udf() must be after SparkContext() is called 

udfValueToCategory = udf(valueToCategory, StringType())

df_with_cat = df.withColumn("category", udfValueToCategory("c1"))

I hope this answer would help you!

...