Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I am using Spark 2.1.0.

When I execute the following code I'm getting an error from Spark. Why? How to fix it?

val i1 = Seq(("a", "string"), ("another", "string"), ("last", "one")).toDF("a", "b")
val i2 = Seq(("one", "string"), ("two", "strings")).toDF("a", "b")
val i1Idx = i1.withColumn("sourceId", lit(1))
val i2Idx = i2.withColumn("sourceId", lit(2))
val input = i1Idx.union(i2Idx)
val weights = Seq((1, 0.6), (2, 0.4)).toDF("sourceId", "weight")
weights.join(input, "sourceId").show


Error:

scala> weights.join(input, "sourceId").show
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [_1#34 AS sourceId#39, _2#35 AS weight#40]
+- Filter (((1 <=> _1#34) || (2 <=> _1#34)) && (_1#34 = 1))
   +- LocalRelation [_1#34, _2#35]
and
Union
:- Project [_1#0 AS a#5, _2#1 AS b#6]
:  +- LocalRelation [_1#0, _2#1]
+- Project [_1#10 AS a#15, _2#11 AS b#16]
   +- LocalRelation [_1#10, _2#11]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1011)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1008)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1008)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:993)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
  at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
  at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
  ... 48 elided

1 Answer

0 votes
by (32.3k points)

Do one thing just trigger inner join after turning on the flag

spark.conf.set("spark.sql.crossJoin.enabled", "true")

You  may also use the cross join.

weights.crossJoin(input)

or set the Alias as

weights.join(input, input("sourceId")===weights("sourceId"), "cross")

This is an issue for your version of Spark but i guess it is fixed now.

But I would suggest you to 2.1.1, you won’t encounter any such issue here.

31k questions

32.8k answers

501 comments

693 users

Browse Categories

...