Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

Using Spark 1.5.0 and given the following code, I expect unionAll to union DataFrames based on their column name. In the code, I'm using some FunSuite for passing in SparkContext sc:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}


Output:

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
+---+---+

+---+---+
|  b|  a|
+---+---+
|  5|  3|
|  6|  4|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
|  5|  3|
|  6|  4|
+---+---+


Why does the result contain intermixed "b" and "a" columns, instead of aligning columns bases on column names? Sounds like a serious bug!?

1 Answer

0 votes
by (32.3k points)

It doesn't look like a bug at all to me. What you are encountering is a standard SQL behavior and every major RDMBS, including PostgreSQL, MySQL, Oracle and MS SQL  behaves exactly the same.

Quoting from PostgreSQL manual:

For calculating the intersection, union or difference of two queries, the two queries must be "union compatible", which means that they shall return the same number of columns and the corresponding columns should have compatible data types

Column names are simply ignored( excluding the first table in the set operation).

This behavior comes from the Relational Algebra where we have basic building block as a tuple. 

Since tuples are ordered and union of two sets of tuples is equivalent (ignoring duplicates handling) to the output you get here, you can match using names by doing something like this:

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.functions.col

def unionByName(a: DataFrame, b: DataFrame): DataFrame = {

  val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq

  a.select(columns: _*).unionAll(b.select(columns: _*))

}

Browse Categories

...