0 votes
1 view
in Big Data Hadoop & Spark by (11.5k points)

I am looking for a solution to be able to log additional data when executing code on Apache Spark Nodes that could help investigate later some issues that might appear during execution. Trying to use a traditional solution like for example com.typesafe.scalalogging.LazyLogging fails because the log instance cannot be serialized on a distributed environment like Apache Spark.

I've investigated this problem and for now the solution that I found was to use the org.apache.spark.Logging trait like this :

class SparkExample with Logging {
  val someRDD = ...
  someRDD.map {
    rddElement => logInfo(s"$rddElement will be processed.")
    doSomething(rddElement)
  }
}


However it looks like the Logging trait is not a permanent solution for Apache Spark because it's marked as @DeveloperApi and the class documentation mentions:

This will likely be changed or removed in future releases.

I am wondering - are they any known logging solution that I can use and will allow me to log data when the RDDs are executed on Apache Spark nodes ?

After some research I tried to use Log4J. But I'm still having issues when using logger from a Scala class (and not a Scala object). Here is my full code :

import org.apache.log4j.Logger
import org.apache.spark._
object Main {
 def main(args: Array[String]) {
  new LoggingTestWithRDD().doTest()
 }
}
class LoggingTestWithRDD extends Serializable {
  val log = Logger.getLogger(getClass.getName)
  def doTest(): Unit = {
   val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
   val spark = new SparkContext(conf)
   val someRdd = spark.parallelize(List(1, 2, 3))
   someRdd.map {
     element =>
       log.info(s"$element will be processed")
       element + 1
    }
   spark.stop()
 }
}

The exception that I'm seeing is :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger

1 Answer

0 votes
by (31.4k points)

You can directly set the logging level using org.apache.log4j.LogManager.getLogger().

I got the similar Serializable error while I was trying to print rdd contents inside rdd.map function. 

Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger

Here is my solution for this problem using scala static object which is extending java.io.Serializable:

import org.apache.log4j.Level

object MyClass extends Serializable{

val log = org.apache.log4j.LogManager.getLogger("name of my spark log")

log.setLevel(Level.INFO)

def main(args:Array[String])

{

rdd.map(t=>

//Using object's logger here

val log =MyClass.log

log.INFO("count"+rdd.count)

)

}

}

Related questions

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...