Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I have a parquet table with one of the columns being

, array<struct<col1,col2,..colN>>

Can run queries against this table in Hive using LATERAL VIEW syntax.

How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?

Could not find any references to this in Spark documentation.

1 Answer

0 votes
by (32.3k points)

There are no extra efforts made in the case of nested collection. Spark will handle it in the same way as a RDD[(String, String)] and a RDD[(String, Seq[String])].

But reading such nested collection from Parquet files can be tricky.

Let's take an example from the spark-shell (1.3.1):

scala> import sqlContext.implicits._

import sqlContext.implicits._

scala> case class Inner(a: String, b: String)

defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])

defined class Outer

Write the parquet file:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))

outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

Read the parquet file:

scala> import org.apache.spark.sql.catalyst.expressions.Row

import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")

dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>

     |   val key = row.getString(0)

     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))

     |   Outer(key, inners)

     | }

outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

The important part is row.getAs[Seq[Row]](1). The internal representation of a nested sequence of struct is ArrayBuffer[Row], you could use any super-type of it instead of Seq[Row]. The 1 is the column index in the outer row. I used the method getAs here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.

Now that you have a RDD[Outer], you can apply any wanted transformation or action.

// Filter the outers

outers.filter(_.inners.nonEmpty)

// Filter the inners

outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

Note that we used the spark-SQL library only to read the parquet file. For example you could only select the wanted columns directly on the DataFrame, before mapping it to an RDD.

dataFrame.select('col1, 'col2).map { row => ... }

Related questions

31k questions

32.8k answers

501 comments

693 users

Browse Categories

...