【问题标题】:Spark-Kafka invalid dependency detected检测到 Spark-Kafka 无效依赖项
【发布时间】:2019-02-20 15:08:51
【问题描述】:

我有一个基本的 Spark - Kafka 代码,我尝试运行以下代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

import java.util.regex.Pattern
import java.util.regex.Matcher
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import Utilities._
object WordCount {
  def main(args: Array[String]): Unit = {

    val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

    setupLogging()

    // Construct a regular expression (regex) to extract fields from raw Apache log lines
    val pattern = apacheLogPattern()

    // hostname:port for Kafka brokers, not Zookeeper
    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
    // List of topics you want to listen for from Kafka
    val topics = List("testLogs").toSet
    // Create our Kafka stream, which will contain (topic,message) pairs. We tack a
    // map(_._2) at the end in order to only get the messages, which contain individual
    // lines of data.
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics).map(_._2)

    // Extract the request field from each log line
    val requests = lines.map(x => {val matcher:Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(5)})

    // Extract the URL from the request
    val urls = requests.map(x => {val arr = x.toString().split(" "); if (arr.size == 3) arr(1) else "[error]"})

    // Reduce by URL over a 5-minute window sliding every second
    val urlCounts = urls.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(300), Seconds(1))

    // Sort and print the results
    val sortedResults = urlCounts.transform(rdd => rdd.sortBy(x => x._2, false))
    sortedResults.print()

    // Kick it off
    ssc.checkpoint("/home/")
    ssc.start()
    ssc.awaitTermination()

  }


}

我正在使用 IntelliJ IDE,并使用 sbt 创建 scala 项目。 build.sbt 文件详情如下:

name := "Sample"

version := "1.0"

organization := "com.sundogsoftware"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.4.1",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.4.1",
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

)

但是,当我尝试构建代码时,会产生以下错误:

错误:scalac:加载类文件“StreamingContext.class”时检测到缺少或无效的依赖项。 无法访问包 org.apache.spark 中的登录类型, 因为它(或其依赖项)丢失了。检查您的构建定义 缺少或冲突的依赖关系。 (使用-Ylog-classpath 重新运行以查看有问题的类路径。) 如果“StreamingContext.class”是针对不兼容的 org.apache.spark 版本编译的,则完全重建可能会有所帮助。

错误:scalac:加载类文件“DStream.class”时检测到缺少或无效的依赖项。 无法访问包 org.apache.spark 中的登录类型, 因为它(或其依赖项)丢失了。检查您的构建定义 缺少或冲突的依赖关系。 (使用-Ylog-classpath 重新运行以查看有问题的类路径。) 如果“DStream.class”是针对不兼容的 org.apache.spark 版本编译的,则完全重建可能会有所帮助。

【问题讨论】:

    标签: scala apache-spark intellij-idea apache-kafka sbt


    【解决方案1】:

    当同时使用不同的 Spark 库时,所有库的版本应该始终匹配。

    另外,你使用的kafka版本也很重要,例如:spark-streaming-kafka-0-10_2.11

    ...
    scalaVersion := "2.11.8"
    val sparkVersion = "2.2.0"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
      "org.apache.spark" %% "spark-streaming" % sparkVersion,
      "org.apache.spark" %% "spark-streaming-kafka-0-10_2.11" % sparkVersion,
      "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"
    

    )

    如果您需要检查应该使用的确切依赖项,这是一个有用的站点: https://search.maven.org/

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-10-28
      • 1970-01-01
      • 2016-05-29
      • 2020-04-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多