Explore Courses Blog Tutorials Interview Questions
0 votes
in Data Science by (11.4k points)

I am trying to use the Spark Dataset API but I am having some issues doing a simple join.

Let's say I have two dataset with fields: date | value, then in the case of DataFrame my join would look like:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

However for Dataset there is the .joinWith method, but the same approach does not work:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

What is the argument required by .joinWith ?

1 Answer

0 votes
by (32.3k points)

To use joinWith you first have to create a DataSet, and most likely two of them. 

Now, in order to create a DataSet, you need to create a case class that matches your schema and call[T] where T is your case class. So:

case class KeyValue(key: Int, value: String)

val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")

val ds =[KeyValue]

// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

You could also skip the case class and use a tuple:

val tupDs =[(Int,String)]

// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

Then, if you have another case class / DF:

case class Nums(key: Int, num1: Double, num2: Long)

val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")

val ds2 =[Nums]

// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

Now, for the similar syntax of join and joinWith, the results are different:

df.join(df2, df.col("key") === df2.col("key")).show

// +---+-----+---+----+----+

// |key|value|key|num1|num2|

// +---+-----+---+----+----+

// |  1| asdf|  1| 7.7| 101|

// |  2|34234|  2| 1.2| 10|

// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show

// +---------+-----------+

// |       _1| _2|

// +---------+-----------+

// | [1,asdf]|[1,7.7,101]|

// |[2,34234]| [2,1.2,10]|

// +---------+-----------+

As you can see, joinWith leaves the objects intact as parts of a tuple, while join flattens out the columns into a single namespace. (This will be causing problems in the above case because the column name "key" is repeated.)

Just note that I have to use df.col("key") and df2.col("key") to create the conditions for joining ds and ds2 -- if you use just col("key") on either side it does not work, and ds.col(...) doesn't exist. However, using the original df.col("key") does the trick.

Related questions

Welcome to Intellipaat Community. Get your technical queries answered by top developers!

30.5k questions

32.5k answers


108k users

Browse Categories