Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
3 views
in Big Data Hadoop & Spark by (11.4k points)

I'm currently trying to extract a database from MongoDB and use Spark to ingest into ElasticSearch with geo_points.

The Mongo database has latitude and longitude values, but ElasticSearch requires them to be casted into the geo_point type.

Is there a way in Spark to copy the lat and lon columns to a new column that is an array or struct?

Any help is appreciated!

1 Answer

0 votes
by (32.3k points)

Let’s assume you started with some kind of flat schema like this:

root

 |-- lat: double (nullable = false)

 |-- long: double (nullable = false)

 |-- key: string (nullable = false)

Now, first create example data:

import org.apache.spark.sql.Row

import org.apache.spark.sql.functions.{col, udf}

import org.apache.spark.sql.types._

val rdd = sc.parallelize(

    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)

val schema = StructType(

    StructField("lat", DoubleType, false) ::

    StructField("long", DoubleType, false) ::

    StructField("key", StringType, false) ::Nil)

val df = sqlContext.createDataFrame(rdd, schema)

An easy way is to use an UDFand case class:

case class Location(lat: Double, long: Double)

val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))

val dfRes = df.

   withColumn("location", makeLocation(col("lat"), col("long"))).

   drop("lat").

   drop("long")

dfRes.printSchema

and we get

root

 |-- key: string (nullable = false)

 |-- location: struct (nullable = true)

 |    |-- lat: double (nullable = false)

 |    |-- long: double (nullable = false)

As an alternative you can transform your data and apply schema afterwards, but this is a much harder approach:

val rddRes = df.

    map{case Row(lat, long, key) => Row(key, Row(lat, long))}

val schemaRes = StructType(

    StructField("key", StringType, false) ::

    StructField("location", StructType(

        StructField("lat", DoubleType, false) ::

        StructField("long", DoubleType, false) :: Nil

    ), true) :: Nil 

)

sqlContext.createDataFrame(rddRes, schemaRes).show

and we get an expected output

+------+-------------+

|   key|     location|

+------+-------------+

|Warsaw|[52.23,21.01]|

| Corte|  [42.3,9.15]|

+------+-------------+

Also, Creating nested schema from scratch can be tedious so if you are okay with it then I would recommend the first approach.  You can easily extend it in order to get more sophisticated structure:

case class Pin(location: Location)

val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))

df.

    withColumn("pin", makePin(col("lat"), col("long"))).

    drop("lat").

    drop("long").

    printSchema

and we get expected output:

root

 |-- key: string (nullable = false)

 |-- pin: struct (nullable = true)

 |    |-- location: struct (nullable = true)

 |    | |-- lat: double (nullable = false)

 |    | |-- long: double (nullable = false)

Unfortunately you have no control over nullable field so if is important for your project you'll have to specify schema.

Now at last you can use struct function introduced in v1.4:

import org.apache.spark.sql.functions.struct

df.select($"key", struct($"lat", $"long").alias("location"))

Related questions

...