0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

Getting strange behavior when calling function outside of a closure:

when function is in a object everything is working
when function is in a class get :


Task not serializable: java.io.NotSerializableException: testing

The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?

This is a working code example:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}


This is the non-working example :

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing { 
  val list = List(1,2,3) 
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

1 Answer

0 votes
by (32.5k points)
edited by

Just do one thing, before your objects passes through the closure do Serialize them, and then,  de-serialize them afterwards. This will work even if your classes aren't Serializable, because it uses Kryo.

Here's an example of how I did it:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo) } val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _ rdd.flatMap(mapper).collectAsMap() object Blah(abc: ABC) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function } }

If you want to know more about Scala, then do check out this awesome video tutorial:

 

Welcome to Intellipaat Community. Get your technical queries answered by top developers !

Categories

...