Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
in Data Science by (11.4k points)

I am trying to use the Spark Dataset API but I am having some issues doing a simple join.

Let's say I have two dataset with fields: date | value, then in the case of DataFrame my join would look like:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

However for Dataset there is the .joinWith method, but the same approach does not work:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

What is the argument required by .joinWith ?

1 Answer

0 votes
by (32.3k points)

To use joinWith you first have to create a DataSet, and most likely two of them. 

Now, in order to create a DataSet, you need to create a case class that matches your schema and call[T] where T is your case class. So:

case class KeyValue(key: Int, value: String)

val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")

val ds =[KeyValue]

// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

You could also skip the case class and use a tuple:

val tupDs =[(Int,String)]

// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

Then, if you have another case class / DF:

case class Nums(key: Int, num1: Double, num2: Long)

val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")

val ds2 =[Nums]

// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

Now, for the similar syntax of join and joinWith, the results are different:

df.join(df2, df.col("key") === df2.col("key")).show

// +---+-----+---+----+----+

// |key|value|key|num1|num2|

// +---+-----+---+----+----+

// |  1| asdf|  1| 7.7| 101|

// |  2|34234|  2| 1.2| 10|

// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show

// +---------+-----------+

// |       _1| _2|

// +---------+-----------+

// | [1,asdf]|[1,7.7,101]|

// |[2,34234]| [2,1.2,10]|

// +---------+-----------+

As you can see, joinWith leaves the objects intact as parts of a tuple, while join flattens out the columns into a single namespace. (This will be causing problems in the above case because the column name "key" is repeated.)

Just note that I have to use df.col("key") and df2.col("key") to create the conditions for joining ds and ds2 -- if you use just col("key") on either side it does not work, and ds.col(...) doesn't exist. However, using the original df.col("key") does the trick.

Related questions

Browse Categories