0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

I have a DF with a huge parseable metadata as a single string column in a Dataframe, lets call it DFA, with ColmnA.

I would like to break this column, ColmnA into multiple columns thru a function, ClassXYZ = Func1(ColmnA). This function returns a class ClassXYZ, with multiple variables, and each of these variables now has to be mapped to new Column, such a ColmnA1, ColmnA2 etc.

How would I do such a transformation from 1 Dataframe to another with these additional columns by calling this Func1 just once, and not have to repeat-it to create all the columns.

1 Answer

0 votes
by (31.4k points)

As per my knowledge I don’t think there is any direct approach to derive multiple columns from a single column of a dataframe. However, UDF can return only a single column at the time. And this limitation can be overpowered in two ways.

1st approach:

Return a column of complex type. The most general solution is a StructType but you can consider ArrayType or MapType as well.

import org.apache.spark.sql.functions.udf

val df = Seq(

  (1L, 3.0, "x"), (2L, -1.0, "y"), (3L, 0.0, "z")

).toDF("a", "b", "c")

case class Foobar(foo: Double, bar: Double)

val foobarUdf = udf((a: Long, b: Double, c: String) => 

  Foobar(a * b, c.head.toInt * b))

val df1 = df.withColumn("foobar", foobarUdf($"a", $"b", $"c"))

df1.show

// +---+----+---+------------+

// |  a| b|  c| foobar|

// +---+----+---+------------+

// |  1| 3.0|  x| [3.0,291.0]|

// |  2|-1.0|  y|[-2.0,-98.0]|

// |  3| 0.0|  z| [0.0,0.0]|

// +---+----+---+------------+

df1.printSchema

// root

//  |-- a: long (nullable = false)

//  |-- b: double (nullable = false)

//  |-- c: string (nullable = true)

//  |-- foobar: struct (nullable = true)

//  | |-- foo: double (nullable = false)

//  | |-- bar: double (nullable = false)

2nd Approach:

Switch to RDD, reshape and rebuild DF:

import org.apache.spark.sql.types._

import org.apache.spark.sql.Row

def foobarFunc(a: Long, b: Double, c: String): Seq[Any] = 

  Seq(a * b, c.head.toInt * b)

val schema = StructType(df.schema.fields ++

  Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))

val rows = df.rdd.map(r => Row.fromSeq(

  r.toSeq ++

  foobarFunc(r.getAs[Long]("a"), r.getAs[Double]("b"), r.getAs[String]("c"))))

val df2 = sqlContext.createDataFrame(rows, schema)

df2.show

// +---+----+---+----+-----+

// |  a| b|  c| foo| bar|

// +---+----+---+----+-----+

// |  1| 3.0|  x| 3.0|291.0|

// |  2|-1.0|  y|-2.0|-98.0|

// |  3| 0.0|  z| 0.0| 0.0|

// +---+----+---+----+-----+

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...