【发布时间】:2019-02-20 15:08:51
【问题描述】:
我有一个基本的 Spark - Kafka 代码,我尝试运行以下代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import java.util.regex.Pattern
import java.util.regex.Matcher
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import Utilities._
object WordCount {
def main(args: Array[String]): Unit = {
val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))
setupLogging()
// Construct a regular expression (regex) to extract fields from raw Apache log lines
val pattern = apacheLogPattern()
// hostname:port for Kafka brokers, not Zookeeper
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
// List of topics you want to listen for from Kafka
val topics = List("testLogs").toSet
// Create our Kafka stream, which will contain (topic,message) pairs. We tack a
// map(_._2) at the end in order to only get the messages, which contain individual
// lines of data.
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics).map(_._2)
// Extract the request field from each log line
val requests = lines.map(x => {val matcher:Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(5)})
// Extract the URL from the request
val urls = requests.map(x => {val arr = x.toString().split(" "); if (arr.size == 3) arr(1) else "[error]"})
// Reduce by URL over a 5-minute window sliding every second
val urlCounts = urls.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(300), Seconds(1))
// Sort and print the results
val sortedResults = urlCounts.transform(rdd => rdd.sortBy(x => x._2, false))
sortedResults.print()
// Kick it off
ssc.checkpoint("/home/")
ssc.start()
ssc.awaitTermination()
}
}
我正在使用 IntelliJ IDE,并使用 sbt 创建 scala 项目。 build.sbt 文件详情如下:
name := "Sample"
version := "1.0"
organization := "com.sundogsoftware"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.2.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.4.1",
"org.apache.spark" %% "spark-streaming-kafka" % "1.4.1",
"org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"
)
但是,当我尝试构建代码时,会产生以下错误:
错误:scalac:加载类文件“StreamingContext.class”时检测到缺少或无效的依赖项。
无法访问包 org.apache.spark 中的登录类型,
因为它(或其依赖项)丢失了。检查您的构建定义
缺少或冲突的依赖关系。 (使用-Ylog-classpath 重新运行以查看有问题的类路径。)
如果“StreamingContext.class”是针对不兼容的 org.apache.spark 版本编译的,则完全重建可能会有所帮助。
错误:scalac:加载类文件“DStream.class”时检测到缺少或无效的依赖项。
无法访问包 org.apache.spark 中的登录类型,
因为它(或其依赖项)丢失了。检查您的构建定义
缺少或冲突的依赖关系。 (使用-Ylog-classpath 重新运行以查看有问题的类路径。)
如果“DStream.class”是针对不兼容的 org.apache.spark 版本编译的,则完全重建可能会有所帮助。
【问题讨论】:
标签: scala apache-spark intellij-idea apache-kafka sbt