Intellipaat Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (11.4k points)

I'm trying to run a sample like https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala. I started with the Spark Structured Streaming Programming guide at http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.

My code is

package io.boontadata.spark.job1

import org.apache.spark.sql.SparkSession

object DirectKafkaAggregateEvents {
  val FIELD_MESSAGE_ID = 0
  val FIELD_DEVICE_ID = 1
  val FIELD_TIMESTAMP = 2
  val FIELD_CATEGORY = 3
  val FIELD_MEASURE1 = 4
  val FIELD_MEASURE2 = 5

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <subscribeType> sample value: subscribe
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics) = args

    val spark = SparkSession
      .builder
      .appName("boontadata-spark-job1")
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }

}


I added the following sbt files:

build.sbt:

name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"

// META-INF discarding
assemblyMergeStrategy in assembly := {
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}


I also added project/assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")


This creates a Uber jar with the non provided jars.

I submit with the following line:

spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic


but I get this runtime error:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
        ... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook

1 Answer

0 votes
by (32.3k points)

I think the issue lies in the following section in build.sbt:

// META-INF discarding

assemblyMergeStrategy in assembly := { 

   {

    case PathList("META-INF", xs @ _*) => MergeStrategy.discard

    case x => MergeStrategy.first

   }

}

This tells us that all META-INF entries should be discarded, including the "code" that makes data source aliases (e.g. kafka) work.

But here the META-INF files are very important for kafka (and other aliases of streaming data sources) to work.

In order to make kafka alias work, Spark SQL uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister with the following entry:

org.apache.spark.sql.kafka010.KafkaSourceProvider

KafkaSourceProvider is responsible to register kafka alias with the proper streaming data source, i.e. KafkaSource.

Just to check that the real code is indeed available, but the "code" that makes the alias registered is not, you could use the kafka data source by the fully-qualified name (not the alias) as follows:

spark.readStream.

  format("org.apache.spark.sql.kafka010.KafkaSourceProvider").

  load

You may also encounter some other problems due to missing options like kafka.bootstrap.servers

A solution is to MergeStrategy.concat all META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (that would create an uber-jar with all data sources, incl. the kafka data source).

case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat

31k questions

32.8k answers

501 comments

693 users

Browse Categories

...