【问题标题】:KafkaUtils class not found in Spark streaming在 Spark 流中找不到 KafkaUtils 类
【发布时间】:2015-02-26 22:32:12
【问题描述】:

我刚开始使用 Spark Streaming,我正在尝试构建一个示例应用程序来计算 Kafka 流中的单词。虽然它使用sbt package 编译,但当我运行它时,我得到NoClassDefFoundError。这个post 似乎有同样的问题,但解决方案是针对 Maven 的,我无法用 sbt 重现它。

KafkaApp.scala:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaApp {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))
    val kafkaParams = Map(
        "zookeeper.connect" -> "localhost:2181",
        "zookeeper.connection.timeout.ms" -> "10000",
        "group.id" -> "sparkGroup"
    )

    val topics = Map(
        "test" -> 1
    )

    // stream of (topic, ImpressionLog)
    val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
    println(s"Number of words: %{messages.count()}")
  }
}

build.sbt:

name := "Simple Project"

version := "1.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1",
    "org.apache.spark" %% "spark-streaming" % "1.1.1",
    "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

然后我提交:

bin/spark-submit \
  --class "KafkaApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.1.jar

错误:

14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.5.252:65077/user/HeartbeatReceiver
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
    at KafkaApp$.main(KafkaApp.scala:28)
    at KafkaApp.main(KafkaApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

【问题讨论】:

    标签: sbt apache-spark apache-kafka


    【解决方案1】:

    spark-submit 不会自动放置包含 KafkaUtils 的包。你需要在你的项目中有 JAR。为此,您需要使用 sbt assembly 创建一个包罗万象的 uber-jar。这是一个示例 build.sbt 。

    https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

    您显然还需要将程序集插件添加到 SBT。

    https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project

    【讨论】:

    • 我在使用 Maven 时也遇到了同样的问题。之后,我在 pom.xml 中包含了“org.apache.maven.plugins”,但问题没有解决。我还需要检查其他参数吗?
    • 随着变化,如果我运行 stb 包,我得到了错误。 : 错误: 未找到: object AssemblyKeys import AssemblyKeys._ ^ [error] 表达式中的类型错误
    • @johnsam 把第一个导入行和​​“assemblySettings”行去掉,对我有用。
    【解决方案2】:

    请尝试在提交应用程序时包含所有依赖项 jar:

    ./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2。 10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark- example-1.0-SNAPSHOT.jar

    【讨论】:

      【解决方案3】:

      关注build.sbt 为我工作。它还要求您将sbt-assembly 插件放在projects/ 目录下的文件中。

      build.sbt

      name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt
      
      libraryDependencies ++= Seq(
        "org.apache.spark" % "spark-streaming_2.10" % "1.4.1",
        "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1",         // kafka
        "org.apache.hbase" % "hbase" % "0.92.1",
        "org.apache.hadoop" % "hadoop-core" % "1.0.2",
        "org.apache.spark" % "spark-mllib_2.10" % "1.3.0"
      )
      
      mergeStrategy in assembly := {
        case m if m.toLowerCase.endsWith("manifest.mf")          => MergeStrategy.discard
        case m if m.toLowerCase.matches("meta-inf.*\\.sf$")      => MergeStrategy.discard
        case "log4j.properties"                                  => MergeStrategy.discard
        case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
        case "reference.conf"                                    => MergeStrategy.concat
        case _                                                   => MergeStrategy.first
      }
      

      项目/plugins.sbt

      addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

      【讨论】:

        【解决方案4】:

        遇到同样的问题,我通过构建带有依赖项的jar来解决它。

        将以下代码添加到 pom.xml

        <build>
            <sourceDirectory>src/main/java</sourceDirectory>
            <testSourceDirectory>src/test/java</testSourceDirectory>
            <plugins>
              <!--
                           Bind the maven-assembly-plugin to the package phase
                this will create a jar file without the storm dependencies
                suitable for deployment to a cluster.
               -->
              <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                  <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                  </descriptorRefs>
                  <archive>
                    <manifest>
                      <mainClass></mainClass>
                    </manifest>
                  </archive>
                </configuration>
                <executions>
                  <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                      <goal>single</goal>
                    </goals>
                  </execution>
                </executions>
              </plugin>
            </plugins>
        </build>    
        

        mvn 包 提交“example-jar-with-dependencies.jar”

        【讨论】:

          【解决方案5】:

          在外部添加了Dependency,project-->properties--> java Build Path-->Libraries--> add External jars,添加需要的jar。

          这解决了我的问题。

          【讨论】:

            【解决方案6】:

            使用 Spark 1.6 为我完成这项工作,而无需处理这么多外部 jar 的麻烦......管理起来会变得相当复杂......

            【讨论】:

              【解决方案7】:

              您也可以下载 jar 文件并将其放在 Spark lib 文件夹中,因为它没有随 Spark 一起安装,而不是拼命尝试打赌 SBT build.sbt 工作。

              http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.10/2.1.1/spark-streaming-kafka-0-10_2.10-2.1.1.jar

              复制到:

              /usr/local/spark/spark-2.1.0-bin-hadoop2.6/jars/

              【讨论】:

                【解决方案8】:

                spark-submit 上使用--packages 参数,它采用group:artifact:version,... 格式的mvn 包

                【讨论】:

                  【解决方案9】:
                  import org.apache.spark.streaming.kafka.KafkaUtils
                  

                  在 build.sbt 中使用以下内容


                  name := "kafka"
                  
                  version := "0.1"
                  
                  scalaVersion := "2.11.12"
                  
                  retrieveManaged := true
                  
                  fork := true
                  
                  //libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
                  //libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
                  
                  libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
                  
                  //libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
                  
                  libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
                  
                  // https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8
                  libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0" % "provided"
                  
                  // https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8-assembly
                  libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8-assembly" % "2.2.0"
                  

                  这将解决问题

                  【讨论】:

                    猜你喜欢
                    • 2017-02-21
                    • 2019-06-06
                    • 1970-01-01
                    • 1970-01-01
                    • 2017-01-12
                    • 1970-01-01
                    • 2016-12-19
                    • 2017-05-09
                    • 2018-10-24
                    相关资源
                    最近更新 更多