0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

I need to join two ordinary RDDs on one/more columns. Logically this operation is equivalent to the database join operation of two tables. I wonder if this is possible only through Spark SQL or there are other ways of doing it.

As a concrete example, consider RDD r1 with primary key ITEM_ID:

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)

and RDD r2 with primary key COMPANY_ID:

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)


I want to join r1 and r2.

How can this be done?

1 Answer

0 votes
by (31.4k points)

Using SparkSQL you can perform joins on RDDs.

Check out the example given below:

object SparkSQLJoin {

case class Item(id:String, name:String, unit:Int, companyId:String)

case class Company(companyId:String, name:String, city:String)

def main(args: Array[String]) {

    val sparkConf = new SparkConf()

    val sc= new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sc)

    import sqlContext.createSchemaRDD

    val i1 = Item("1", "first", 1, "c1")

    val i2 = Item("2", "second", 2, "c2")

    val i3 = Item("3", "third", 3, "c3")

    val c1 = Company("c1", "company-1", "city-1")

    val c2 = Company("c2", "company-2", "city-2")

    val companies = sc.parallelize(List(c1,c2))

    companies.registerAsTable("companies")

    val items = sc.parallelize(List(i1,i2,i3))

    items.registerAsTable("items")

    val result = sqlContext.sql("SELECT * FROM companies C JOIN items I ON C.companyId= I.companyId").collect

    result.foreach(println)

    }

}

Output is displayed as

   [c1,company-1,city-1,1,first,1,c1]

   [c2,company-2,city-2,2,second,2,c2]

Related questions

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...