【问题标题】:Exception in thread "JobGenerator" java.lang.NoSuchMethodError: 'scala.collection.mutable.ArrayOps scala.Predef$.refArrayOps(java.lang.Object[])'线程“JobGenerator”中的异常 java.lang.NoSuchMethodError: 'scala.collection.mutable.ArrayOps scala.Predef$.refArrayOps(java.lang.Object[])'
【发布时间】:2021-08-18 23:17:43
【问题描述】:

我在尝试运行 Spark Streaming 以从 Kafka 读取数据时遇到此错误,我在 google 上进行了搜索,但答案并没有解决我的错误。

我在Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class ( Java)https://stackoverflow.com/users/9023547/chandan 的答案修复了一个错误,但后来又得到了这个错误。

这是我运行项目时的终端:

Exception in thread "JobGenerator" java.lang.NoSuchMethodError: 'scala.collection.mutable.ArrayOps scala.Predef$.refArrayOps(java.lang.Object[])'
        at org.apache.spark.streaming.kafka010.KafkaRDD.count(KafkaRDD.scala:89)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:216)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
        at org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:36)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
        at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
        at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
        at scala.Option.orElse(Option.scala:447)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph.$anonfun$generateJobs$2(DStreamGraph.scala:123)
        at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
        at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:122)
        at org.apache.spark.streaming.scheduler.JobGenerator.$anonfun$generateJobs$1(JobGenerator.scala:252)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:250)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:91)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:90)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
21/05/31 22:00:40 ERROR JobScheduler: Error in job generator
java.lang.IllegalStateException: JobGenerator has already been stopped accidentally.
        at org.apache.spark.util.EventLoop.post(EventLoop.scala:107)
        at org.apache.spark.streaming.scheduler.JobGenerator.$anonfun$timer$1(JobGenerator.scala:63)
        at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
        at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
        at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
Exception in thread "main" java.lang.IllegalStateException: JobGenerator has already been stopped accidentally.
        at org.apache.spark.util.EventLoop.post(EventLoop.scala:107)
        at org.apache.spark.streaming.scheduler.JobGenerator.$anonfun$timer$1(JobGenerator.scala:63)
        at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
        at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
        at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)

这是项目的 pom.xml 文件:

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>TikiData</groupId>
    <artifactId>TikiData</artifactId>
    <version>V1</version>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.scalatest/scalatest -->
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_2.11</artifactId>
            <version>2.2.6</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.scalatest/scalatest -->
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_2.11</artifactId>
            <version>2.2.6</version>
            <scope>test</scope>
        </dependency>


    </dependencies>
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <release>11</release>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <archive>
                                <manifest>
                                    <mainClass>
                                        demo.KafkaDemo
                                    </mainClass>
                                </manifest>
                            </archive>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

这是项目的主要文件:

package demo;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class KafkaDemo {
    public static void main(String[] args) throws InterruptedException {
        // Create a local StreamingContext and batch interval of 10 second
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Kafka Spark Integration");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

        //Define Kafka parameter
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "0");
//        // Automatically reset the offset to the earliest offset
//        kafkaParams.put("auto.offset.reset", "earliest");
//        kafkaParams.put("enable.auto.commit", false);

        //Define a list of Kafka topic to subscribe
        Collection<String> topics = Arrays.asList("hello-kafka");

        //Create an input Dstream which consume message from Kafka topics
        JavaInputDStream<ConsumerRecord<String, String>> stream;
        stream = KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams));


        // Read value of each message from Kafka
        JavaDStream<String> lines = stream.map((Function<ConsumerRecord<String, String>, String>) kafkaRecord -> kafkaRecord.value());

        // Split message into words
        JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());

        // Take every word and return Tuple with (word,1)
        JavaPairDStream<String,Integer> wordMap = words.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word,1));

        // Count occurance of each word
        JavaPairDStream<String,Integer> wordCount = wordMap.reduceByKey((Function2<Integer, Integer, Integer>) (first, second) -> first+second);

        //Print the word count
        wordCount.print();

        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }
}

【问题讨论】:

  • 确保所有依赖项都针对相同的 Scala 版本。您对kafka_2.13 有依赖关系,这可能会造成麻烦。试试kafka_2.11 甚至删除它(我不相信你需要它)。
  • spark-streaming-kafka-0-10_2.11版本更改为2.4.2

标签: java apache-spark apache-kafka


【解决方案1】:

答案和以前一样。使所有 Spark 和 Scala 版本完全相同。发生的事情是kafka_2.13 依赖于 Scala 2.13,而您的其余依赖项是 2.11... Spark 2.4 不支持 Scala 2.13

您可以使用 Maven 属性更轻松地做到这一点

<properties>
    <scala.minor.version>2.11</scala.minor.version>
    <spark.version>2.4.2</spark.version>
</properties>

您也不应该将 Kafka 作为依赖项包含在内,我建议使用 Scala 2.12,但这取决于您,因为无论如何您都没有使用 Scala

你应该只需要 Spark 核心和这三个来运行该代码

         <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.minor.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.minor.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.minor.version}.8</version>
        </dependency>

另外值得指出的是,Spark 2.4 不使用 Hadoop 3 客户端,并且 DStream Kafka API 已被有效弃用,取而代之的是结构化流(spark-sql-kafka-0-10 依赖项)

【讨论】:

    【解决方案2】:

    我机器上的 spark 版本是 3.1.1,所以我在 pom.xml 文件中将它改回 3.1.1,将所有 scala 和 spark 版本修复为这样的通用版本:

    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>TikiData</groupId>
        <artifactId>TikiData</artifactId>
        <version>V1</version>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.6</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>3.3.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.1.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.1.1</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
                <version>3.1.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
                <version>3.1.1</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.12.2</version>
            </dependency>
    
        </dependencies>
        <build>
            <sourceDirectory>src</sourceDirectory>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <release>11</release>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                            <configuration>
                                <archive>
                                    <manifest>
                                        <mainClass>
                                            demo.KafkaDemo
                                        </mainClass>
                                    </manifest>
                                </archive>
                                <descriptorRefs>
                                    <descriptorRef>jar-with-dependencies</descriptorRef>
                                </descriptorRefs>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

    【讨论】:

      猜你喜欢
      • 2017-08-28
      • 1970-01-01
      • 2019-09-24
      • 1970-01-01
      • 2017-05-02
      • 2020-08-23
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多