Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Data Science by (11.4k points)

I am using Spark SQL (I mention that it is in Spark in case that affects the SQL syntax - I'm not familiar enough to be sure yet) and I have a table that I am trying to re-structure, but I'm getting stuck trying to transpose multiple columns at the same time.

Basically I have data that looks like:

userId    someString      varA     varB
   1      "example1"    [0,2,5]   [1,2,9]
   2      "example2"    [1,20,5]  [9,null,6]


and I'd like to explode both varA and varB simultaneously (the length will always be consistent) - so that the final output looks like this:

userId    someString      varA     varB
   1      "example1"       0         1
   1      "example1"       2         2
   1      "example1"       5         9
   2      "example2"       1         9
   2      "example2"       20       null
   2      "example2"       5         6


but I can only seem to get a single explode(var) statement to work in one command, and if I try to chain them (ie create a temp table after the first explode command) then I obviously get a huge number of duplicate, unnecessary rows.

1 Answer

0 votes
by (32.3k points)
edited by

For Spark >= 2.4

In Spark 2.4+, you are provided with arrays_zip function which would be very efficient to use in this case(skip zip udf):

df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(

  $"userId", $"someString",

  $"vars.varA", $"vars.varB").show

Spark < 2.4

What you want is not possible without a custom UDF. In Scala you could do something like this:

val data = sc.parallelize(Seq(

    """{"userId": 1, "someString": "example1",

        "varA": [0, 2, 5], "varB": [1, 2, 9]}""",

    """{"userId": 2, "someString": "example2",

        "varA": [1, 20, 5], "varB": [9, null, 6]}"""

))

val df = spark.read.json(data)

df.printSchema

// root

//  |-- someString: string (nullable = true)

//  |-- userId: long (nullable = true)

//  |-- varA: array (nullable = true)

//  | |-- element: long (containsNull = true)

//  |-- varB: array (nullable = true)

//  | |-- element: long (containsNull = true)

Now we can define zip udf:

import org.apache.spark.sql.functions.{udf, explode}

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(

   $"userId", $"someString",

   $"vars._1".alias("varA"), $"vars._2".alias("varB")).show

// +------+----------+----+----+

// |userId|someString|varA|varB|

// +------+----------+----+----+

// |     1| example1|   0| 1|

// |     1| example1|   2| 2|

// |     1| example1|   5| 9|

// |     2| example2|   1| 9|

// |     2| example2|  20|null|

// |     2| example2|   5| 6|

// +------+----------+----+----+

With raw SQL:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

df.registerTempTable("df")

sqlContext.sql(

  """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")

Browse Categories

...