Let’s assume you started with some kind of flat schema like this:
root
|-- lat: double (nullable = false)
|-- long: double (nullable = false)
|-- key: string (nullable = false)
Now, first create example data:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._
val rdd = sc.parallelize(
Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)
val schema = StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) ::
StructField("key", StringType, false) ::Nil)
val df = sqlContext.createDataFrame(rdd, schema)
An easy way is to use an UDFand case class:
case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))
val dfRes = df.
withColumn("location", makeLocation(col("lat"), col("long"))).
drop("lat").
drop("long")
dfRes.printSchema
and we get
root
|-- key: string (nullable = false)
|-- location: struct (nullable = true)
| |-- lat: double (nullable = false)
| |-- long: double (nullable = false)
As an alternative you can transform your data and apply schema afterwards, but this is a much harder approach:
val rddRes = df.
map{case Row(lat, long, key) => Row(key, Row(lat, long))}
val schemaRes = StructType(
StructField("key", StringType, false) ::
StructField("location", StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) :: Nil
), true) :: Nil
)
sqlContext.createDataFrame(rddRes, schemaRes).show
and we get an expected output
+------+-------------+
| key| location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte| [42.3,9.15]|
+------+-------------+
Also, Creating nested schema from scratch can be tedious so if you are okay with it then I would recommend the first approach. You can easily extend it in order to get more sophisticated structure:
case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))
df.
withColumn("pin", makePin(col("lat"), col("long"))).
drop("lat").
drop("long").
printSchema
and we get expected output:
root
|-- key: string (nullable = false)
|-- pin: struct (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: double (nullable = false)
| | |-- long: double (nullable = false)
Unfortunately you have no control over nullable field so if is important for your project you'll have to specify schema.
Now at last you can use struct function introduced in v1.4:
import org.apache.spark.sql.functions.struct
df.select($"key", struct($"lat", $"long").alias("location"))