Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)
edited by

According to Introducing Spark Datasets:

As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically: ... Custom encoders – while we currently autogenerate encoders for a wide variety of types, we’d like to open up an API for custom objects.

and attempts to store custom type in a Dataset lead to following error like:

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases

Or: 

Java.lang.UnsupportedOperationException: No Encoder found for ....

Are there any existing workarounds?

1 Answer

0 votes
by (32.3k points)
edited by

Just use kryo

The best suggestion that you will get for this problem is to use the kryo encoder.

import spark.implicits._

class MyObj(val i: Int)

implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]

// ...

val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

This gets very tedious. Especially if your code is manipulating all sorts of datasets, joining, grouping etc. in such case you end up collecting a bunch of extra implicits. So, as a better approach we can simply make an implicit that does this all automatically

import scala.reflect.ClassTag

implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 

  org.apache.spark.sql.Encoders.kryo[A](ct)

And now, it seems like I can do almost anything I want (the example below won't work in the spark-shell where spark.implicits._ is automatically imported)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..

val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type

val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

The problem is that using kryo leads to Spark just storing every row in the dataset as a flat binary object. For map, filter, foreach that is enough, but for operations like join, Spark really needs these to be separated into columns. Inspecting the schema for d2 or d3, you see there is just one binary column:

d2.printSchema

// root

//  |-- value: binary (nullable = true)

If you want more information regarding the same, refer the following video tutorial:

Related questions

0 votes
1 answer
0 votes
1 answer
0 votes
1 answer
...