Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

What's the difference between join and cogroup in Apache Spark? 

1 Answer

0 votes
by (32.3k points)

Queries is used to access multiple tables at once, or it can access the same table in such a way that multiple rows of the same or different tables are being processed at the same time. A query that accesses in such a way is called a join query.

It is quite common to join multiple data sets.The join function joins any two SparkR DataFrames based on the given join expression. In a case where no join expression is mentioned, it will perform a Cartesian join

Given below are the basic ways to join the data:

  • join (inner join)

  • rightOuterJoin

  • leftOuterJoin

  • fullOuterJoin

  • semiJoin

Here is a simple example that would cover all join types:

import org.apache.spark._

import org.apache.spark.sql._

import org.apache.spark.sql.expressions._

import org.apache.spark.sql.functions._

object SparkSandbox extends App {

  case class Row(id: Int, value: String)

  private[this] implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()

  import spark.implicits._

  spark.sparkContext.setLogLevel("ERROR")

  val r1 = Seq(Row(1, "A1"), Row(2, "A2"), Row(3, "A3"), Row(4, "A4")).toDS()

  val r2 = Seq(Row(3, "A3"), Row(4, "A4"), Row(4, "A4_1"), Row(5, "A5"), Row(6, "A6")).toDS()

  val joinTypes = Seq("inner", "outer", "full", "full_outer", "left", "left_outer", "right", "right_outer", "left_semi", "left_anti")

  joinTypes foreach {joinType =>

    println(s"${joinType.toUpperCase()} JOIN")

    r1.join(right = r2, usingColumns = Seq("id"), joinType = joinType).orderBy("id").show()

  }

}

Just run the code and check out the result.

CoGroup:

In Spark, the cogroup function is performed on different datasets, let's say, (K, V) and (K, W) and returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also known as groupWith.

As one key at least appear in either of the two rdds, it will appear in the final result

Lets see this basic example:

val myrdd1 = sc.parallelize(List((1,"physics"),(2,"sanskrit"),(3,"hindi"),(4,"ohysical"),(6,"computer")))

val myrdd2 = sc.parallelize(List((4,"english"),(5,"arts"),(6,"social"),(1,"bio"),(1,"chemistry")))

val result = myrdd1.cogroup(myrdd2)

result.collect

This is very similar to relation database operation FULL OUTER JOIN, but instead of flattening the result per line per record, it will give you the iterable interface to you, the following operation is up to you as convenient!

...