【发布时间】:2018-09-13 05:13:12
【问题描述】:
我首先要提到的是,我已经尝试了类似主题中的所有建议,但对我没有任何帮助,所以请这不是重复的问题。
我遇到的问题如下 -
我正在尝试使用 spark 流和 kafka 在 spark 上运行示例 java 应用程序。我已经添加了所有必需的依赖项:
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
在我想运行我的应用程序的服务器上部署 Jar 之后(我已经使用 spark、kafka 设置了环境并创建了相关主题)我正在尝试 spark-submit 它并收到以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org.apache.spark.streaming.kafka010.LocationStrategies
at JavaWordCount.main(JavaWordCount.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:90)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:508)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.LocationStrategies
at java.net.URLClassLoader.findClass(URLClassLoader.java:609)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:924)
at java.lang.ClassLoader.loadClass(ClassLoader.java:869)
at java.lang.ClassLoader.loadClass(ClassLoader.java:852)
似乎工作人员无法将依赖项识别为环境的一部分。我在网上做了一些研究,许多人建议使用 maven-shade-plugin 创建一个程序集 JAR。所以我也尝试用这种方式对jar进行maven-package,但仍然没有成功。
作为参考,这里是应用失败的地方:
// Configure Spark to connect to Kafka running on local machine
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//Configure Spark to listen messages in topic test
Collection<String> topics = Arrays.asList("wordCount");
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkKafkaWordCount");
//Read messages in batch of 30 seconds
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));
// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
在上面的最后一行中,LocationStrategies 类无法识别,即使我已在 pom.xml 中添加了正确的依赖项
有什么办法可以解决这个问题吗?
【问题讨论】:
标签: java maven apache-spark apache-kafka