【问题标题】:Cassandra class not found using spark streaming使用火花流未找到 Cassandra 类
【发布时间】:2017-01-01 20:31:09
【问题描述】:

我正在使用 Spark 从 kafka 获取数据并将其插入 Cassandra。我的程序是

public static void fetchAndValidateData() {
    SparkConf sparkConf = new SparkConf().setAppName("name")
            .set("spark.cassandra.connection.host", "127.0.0.1")
            .set("spark.cleaner.ttl", "3600");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    Map<String,String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "127.0.0.1");
    kafkaParams.put("group.id", App.GROUP);
    JavaPairReceiverInputDStream<String, EventLog> messages =
            KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
                    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
    JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
        @Override
        public EventLog call(Tuple2<String, EventLog> tuple2) {
            return tuple2._2();
        }
    });
    lines.foreachRDD(rdd -> { javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra(); });
    jssc.start();
    try {
        jssc.awaitTermination();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    jssc.stop();
    jssc.close();
}

我的spark-submit 命令是C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 --class "com.jtv.spark.atnt.App" --master local[4] target\spark.atnt-0.0.1-SNAPSHOT.jar

我的 POM 文件是

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.jtv</groupId>
  <artifactId>spark.atnt</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spark.atnt</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <build>
    <plugins>  
       <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>single</goal>
                  </goals>
              </execution>
          </executions>
          <configuration>
              <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
          </configuration>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.11</artifactId>
        <version>1.5.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.11</artifactId>
      <version>1.6.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>javax.json</groupId>
        <artifactId>javax.json-api</artifactId>
        <version>1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>    
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

我收到java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil 错误。

我该如何解决?


编辑:

我找出了导致问题的原因。它是org.apache.kafka:kafka_2.10:0.8.0。当我向它添加provided 时,我的mvn package 命令出现Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:single (default) on project spark.atnt: Failed to create assembly: Failed to resolve dependencies for project: com.jtv:spark.atnt:jar:0.0.1-SNAPSHOT: Could not transfer artifact com.sun.jdmk:jmxtools:jar:1.2.1 from/to java.net (https://maven-repository.dev.java.net/nonav/repository): Cannot access https://maven-repository.dev.java.net/nonav/repository with type legacy using the available connector factories: BasicRepositoryConnectorFactory 错误,当我删除它时,我的spark-submit 命令出现java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil 错误。

【问题讨论】:

    标签: java apache-spark cassandra apache-kafka


    【解决方案1】:

    解决此问题的最简单方法是将 Cassandra 库打包到您的 jar 文件中。

    为此,您可以在 pom.xml 中使用 maven-assembly-plugin:

           <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
    

    这个插件会将你所有的依赖打包到你的 jar 文件中。如果你想阻止一些依赖被打包(例如spark)你需要添加标签&lt;scope&gt;provided&lt;/scope&gt;。例如:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.2</version>
            <scope>provided</scope>
        </dependency>
    

    请注意,如果您使用上述组装插件,您将在目标文件夹中获得两个 jar 文件。如果你想使用完整的 jar,你需要运行: C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 --class "com.jtv.spark.atnt.App" --master local[4] target\spark.atnt-0.0.1-SNAPSHOT-jar-with-dependencies.jar

    【讨论】:

    • 我尝试了您的解决方案。在运行mvn package 时,我收到Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:single (default) on project spark.atnt: Failed to create assembly: Failed to resolve dependencies for project: com.jtv:spark.atnt:jar:0.0.1-SNAPSHOT: Could not transfer artifact com.sun.jmx:jmxri:jar:1.2.1 from/to java.net (https://maven-repository.dev.java.net/nonav/repository): Cannot access https://maven-repository.dev.java.net/nonav/repository with type legacy using the available connector factories: BasicRepositoryConnectorFactory 错误。
    • 看起来您可能遇到类似于此 stackoverflow 问题的问题:stackoverflow.com/questions/4908651/…
    • 记得在目标 jar 文件中为不需要的依赖项设置&lt;scope&gt;provided&lt;/scope&gt;。例如群里的任何东西org.apache.spark
    • 我找出了导致问题的原因。它是org.apache.kafka:kafka_2.10:0.8.0。当我添加provided 时,我得到Cannot access https://maven-repository.dev.java.net/nonav/repository with type legacy using the available connector factories: BasicRepositoryConnectorFactory,当我删除它时,我得到java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil
    猜你喜欢
    • 2016-02-07
    • 1970-01-01
    • 2016-09-06
    • 2021-07-25
    • 1970-01-01
    • 2019-06-01
    • 1970-01-01
    • 2018-08-09
    • 1970-01-01
    相关资源
    最近更新 更多