【发布时间】:2021-08-18 23:17:43
【问题描述】:
我在尝试运行 Spark Streaming 以从 Kafka 读取数据时遇到此错误,我在 google 上进行了搜索,但答案并没有解决我的错误。
我在Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class ( Java) 用https://stackoverflow.com/users/9023547/chandan 的答案修复了一个错误,但后来又得到了这个错误。
这是我运行项目时的终端:
Exception in thread "JobGenerator" java.lang.NoSuchMethodError: 'scala.collection.mutable.ArrayOps scala.Predef$.refArrayOps(java.lang.Object[])'
at org.apache.spark.streaming.kafka010.KafkaRDD.count(KafkaRDD.scala:89)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:216)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
at org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:36)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph.$anonfun$generateJobs$2(DStreamGraph.scala:123)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:122)
at org.apache.spark.streaming.scheduler.JobGenerator.$anonfun$generateJobs$1(JobGenerator.scala:252)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:250)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:91)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:90)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
21/05/31 22:00:40 ERROR JobScheduler: Error in job generator
java.lang.IllegalStateException: JobGenerator has already been stopped accidentally.
at org.apache.spark.util.EventLoop.post(EventLoop.scala:107)
at org.apache.spark.streaming.scheduler.JobGenerator.$anonfun$timer$1(JobGenerator.scala:63)
at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
Exception in thread "main" java.lang.IllegalStateException: JobGenerator has already been stopped accidentally.
at org.apache.spark.util.EventLoop.post(EventLoop.scala:107)
at org.apache.spark.streaming.scheduler.JobGenerator.$anonfun$timer$1(JobGenerator.scala:63)
at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
这是项目的 pom.xml 文件:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>TikiData</groupId>
<artifactId>TikiData</artifactId>
<version>V1</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scalatest/scalatest -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>2.2.6</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scalatest/scalatest -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>2.2.6</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
demo.KafkaDemo
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
这是项目的主要文件:
package demo;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
public class KafkaDemo {
public static void main(String[] args) throws InterruptedException {
// Create a local StreamingContext and batch interval of 10 second
SparkConf conf = new SparkConf().setMaster("local").setAppName("Kafka Spark Integration");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
//Define Kafka parameter
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "0");
// // Automatically reset the offset to the earliest offset
// kafkaParams.put("auto.offset.reset", "earliest");
// kafkaParams.put("enable.auto.commit", false);
//Define a list of Kafka topic to subscribe
Collection<String> topics = Arrays.asList("hello-kafka");
//Create an input Dstream which consume message from Kafka topics
JavaInputDStream<ConsumerRecord<String, String>> stream;
stream = KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams));
// Read value of each message from Kafka
JavaDStream<String> lines = stream.map((Function<ConsumerRecord<String, String>, String>) kafkaRecord -> kafkaRecord.value());
// Split message into words
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
// Take every word and return Tuple with (word,1)
JavaPairDStream<String,Integer> wordMap = words.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word,1));
// Count occurance of each word
JavaPairDStream<String,Integer> wordCount = wordMap.reduceByKey((Function2<Integer, Integer, Integer>) (first, second) -> first+second);
//Print the word count
wordCount.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
【问题讨论】:
-
确保所有依赖项都针对相同的 Scala 版本。您对
kafka_2.13有依赖关系,这可能会造成麻烦。试试kafka_2.11甚至删除它(我不相信你需要它)。 -
将
spark-streaming-kafka-0-10_2.11版本更改为2.4.2
标签: java apache-spark apache-kafka