【问题标题】:Structured Streaming kafka spark java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging结构化流卡夫卡火花 java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging
【发布时间】:2019-11-18 11:12:52
【问题描述】:

我发现很多关于这个问题的常见问题解答,但没有用。

我是java和bigdata的新手,java依赖管理对我来说很糟糕。你必须猜测应该使用哪个包和版本,如果第三方库不告诉你什么,哪个包会冲突

p>

我想从kafka topic解析json数据并保存到hbase。

主要代码

package com.yizhisec.bigdata;

import com.yizhisec.bigdata.model.Traffic;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.io.IOException;
import java.util.Properties;

public class KafkaStructStream {

    private Dataset<Row> initStructKafka() throws IOException {
        Properties kafkaProp = Config.getProp();
        SparkSession spark = SparkSession
                .builder()
                .appName("Kafka")
                .master("local[2]")
                .getOrCreate();
        return spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", kafkaProp.getProperty("kafka.broker.list"))
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.truststore.location", Config.getPath(Config.KAFKA_JKS))
                .option("kafka.ssl.truststore.password", kafkaProp.getProperty("kafka.jks.passwd"))
                .option("startingOffsets", "latest")
                .option("subscribe", kafkaProp.getProperty("kafka.topic"))
                .load();
    }

    private void run() {
        Dataset<Row> df = null;
        try {
            df = initStructKafka();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        df.printSchema();
        Dataset<Traffic> ds = df.as(ExpressionEncoder.javaBean(Traffic.class));

        StreamingQuery query = ds.writeStream().foreach(new ForeachWriter<Traffic>() {
            @Override
            public boolean open(long partitionId, long epochId) {
                return false;
            }

            @Override
            public void process(Traffic value) {
                System.out.println(value);
            }

            @Override
            public void close(Throwable errorOrNull) {

            }
        }).start();

//        StreamingQuery query = ds.writeStream().format("console")
//                .trigger(Trigger.Continuous("2 seconds"))
//                .start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }

    }


    public static void main(String[] args) {
        KafkaStructStream k = new KafkaStructStream();
        k.run();
    }

}

交通类

public class Traffic {
    private Long guid;
    private int time;
    private int end_time;
    private String srcip;
    private String srcmac;
    private int srcport;
    private String destip;
    private String destmac;
    private int destport;
    private String proto;
    private String appproto;
    private Long upsize;
    private Long downsize;

    getter and setter
}

依赖

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11.12</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <!--        kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>2.4.4</version>
            <scope>provided</scope>
        </dependency>
</dependencies>

错误

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.yizhisec.bigdata.KafkaStructStream.initStructKafka(KafkaStructStream.java:20)
    at com.yizhisec.bigdata.KafkaStructStream.run(KafkaStructStream.java:37)
    at com.yizhisec.bigdata.KafkaStructStream.main(KafkaStructStream.java:76)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 15 more

解决方案

经过一番尝试。最后,我找到了解决方案。 非常愚蠢的默认选项浪费了我一天的时间。我太笨了,找不到类已经存在,但是导入在运行时失败,因为提供了范围

【问题讨论】:

  • 花了大约 1 小时试图调试这个...谢谢

标签: java apache-spark apache-kafka


【解决方案1】:

你必须猜测应该使用哪个包和版本

不是真的在猜测... Spark 2.4.x 是使用 Scala 2.12 构建的,并且已记录在案。你的 POM 说 Scala 2.11.x

您还应该删除 spark-streaming-kafka_2.11 和 Kafka 依赖项,因为您使用的是结构化流,这需要 sql-kafka 之一,但未提供,因此删除范围标记

如果你总是使用&lt;version&gt;${spark.version}&lt;/version&gt;,那么你不应该去猜测

旁注:有 Spark Hbase 库,因此您不需要编写自己的 foreach 编写器

【讨论】:

    猜你喜欢
    • 2019-06-01
    • 2018-02-21
    • 2016-12-19
    • 2016-08-03
    • 2018-09-15
    • 2017-03-10
    • 2018-08-13
    • 2018-02-24
    相关资源
    最近更新 更多