【问题标题】:Error Running Cassandra from Spark in Java - NoClassDefFoundError at org.apache.spark.sql.catalyst在 Java 中从 Spark 运行 Cassandra 时出错 - org.apache.spark.sql.catalyst 上的 NoClassDefFoundError
【发布时间】:2019-01-22 16:33:46
【问题描述】:

我正在使用 Cassandra 3.0.3、Spark 1.6.0 并尝试通过结合 http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java 中的旧文档和 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md 中的新文档中的代码来运行。

这是我的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>muhrafifm</groupId>
 <artifactId>spark-cass-twitterdw</artifactId>
 <version>1.0</version>
 <packaging>jar</packaging>
 <build>
    <plugins>
      <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.0</version>
          <configuration>
              <source>1.7</source>
              <target>1.7</target>
          </configuration>
      </plugin>
    </plugins>
</build>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>        
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
        <type>jar</type>    
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.6.0-M1</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.6.0-M1</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.0</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.thrift</groupId>
        <artifactId>libthrift</artifactId>
        <version>0.9.1</version>
     </dependency>
</dependencies>

我所做的更改基本上是在方法javaFunction中,这是我根据新文档更改javaFunction后的方法之一。我还加入了import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

private void generateData(JavaSparkContext sc) {
    CassandraConnector connector = CassandraConnector.apply(sc.getConf());

    // Prepare the schema
    try (Session session = connector.openSession()) {
        session.execute("DROP KEYSPACE IF EXISTS java_api");
        session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
        session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
        session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
    }

    // Prepare the products hierarchy
    List<Product> products = Arrays.asList(
            new Product(0, "All products", Collections.<Integer>emptyList()),
            new Product(1, "Product A", Arrays.asList(0)),
            new Product(4, "Product A1", Arrays.asList(0, 1)),
            new Product(5, "Product A2", Arrays.asList(0, 1)),
            new Product(2, "Product B", Arrays.asList(0)),
            new Product(6, "Product B1", Arrays.asList(0, 2)),
            new Product(7, "Product B2", Arrays.asList(0, 2)),
            new Product(3, "Product C", Arrays.asList(0)),
            new Product(8, "Product C1", Arrays.asList(0, 3)),
            new Product(9, "Product C2", Arrays.asList(0, 3))
    );

    JavaRDD<Product> productsRDD = sc.parallelize(products);       
    javaFunctions(productsRDD).writerBuilder("java_api", "products", mapToRow(Product.class)).saveToCassandra();

    JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
        @Override
        public Boolean call(Product product) throws Exception {
            return product.getParents().size() == 2;
        }
    }).flatMap(new FlatMapFunction<Product, Sale>() {
        @Override
        public Iterable<Sale> call(Product product) throws Exception {
            Random random = new Random();
            List<Sale> sales = new ArrayList<>(1000);
            for (int i = 0; i < 1000; i++) {
                sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
            }
            return sales;
        }
    });
    javaFunctions(salesRDD).writerBuilder("java_api", "sales", mapToRow(Sale.class)).saveToCassandra();
}

这是我得到的错误。

16/03/04 13:29:06 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
16/03/04 13:29:06 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
    at muhrafifm.spark.cass.twitterdw.Demo.generateData(Demo.java:69)
    at muhrafifm.spark.cass.twitterdw.Demo.run(Demo.java:35)
    at muhrafifm.spark.cass.twitterdw.Demo.main(Demo.java:181)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 11 more
16/03/04 13:29:40 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/03/04 13:29:41 INFO SparkContext: Invoking stop() from shutdown hook
16/03/04 13:29:41 INFO SparkUI: Stopped Spark web UI at http://10.144.233.28:4040
16/03/04 13:29:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/04 13:29:42 INFO MemoryStore: MemoryStore cleared
16/03/04 13:29:42 INFO BlockManager: BlockManager stopped
16/03/04 13:29:42 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/04 13:29:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/04 13:29:42 INFO SparkContext: Successfully stopped SparkContext
16/03/04 13:29:42 INFO ShutdownHookManager: Shutdown hook called
16/03/04 13:29:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-    16fd2ae2-b61b-4411-a776-1e578caabba6
------------------------------------------------------------------------
BUILD FAILURE

我做错了什么吗?似乎需要我什至不使用的包,有什么可以解决的吗?还是应该使用以前版本的 cassandra-spark-connector?

感谢任何回复,谢谢。

【问题讨论】:

    标签: java spark-cassandra-connector


    【解决方案1】:

    代码正在寻找

    org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
    

    所以你应该包含 spark-sql 库,它具有正确的依赖关系。

    【讨论】:

    • 我已经检查了依赖项并且有 spark-sql 库,我尝试过导入它,但它什么也没做。您如何包含并确保包含该库?
    • 要么将库放入工作人员的 lib 目录,要么将其作为 spark 提交命令中的 --jars 选项发送给他们
    【解决方案2】:

    我遇到了同样的问题,问题是 Spark 版本和 Spark Cassandra 连接器之间的兼容性。 我使用的是 spark 2.3,而 Cassandra 连接器是旧版本。

    版本兼容性矩阵可在此处获得:

    https://github.com/datastax/spark-cassandra-connector

    【讨论】:

    • 是的,您可能遇到了范围内没有 spark-sql 的问题,或者 spark 和 spark-cassandra 版本不兼容。
    【解决方案3】:

    这是我用于此应用程序的 POM,它完全运行没有任何问题(java 版本“1.8.0_131”和 javac 1.8.0_131)。 完整的应用程序可以在这里找到。 https://github.com/sunone5/BigData/tree/master/spark-cassandra-streaming


    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>spark-cassandra-streaming</groupId>
        <artifactId>spark-cassandra-streaming</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    
        <dependencies>
    
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>2.2.0</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.11 -->
            <dependency>
                <groupId>com.datastax.spark</groupId>
                <artifactId>spark-cassandra-connector_2.11</artifactId>
                <version>2.0.5</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector-java_2.10 -->
            <dependency>
                <groupId>com.datastax.spark</groupId>
                <artifactId>spark-cassandra-connector-java_2.10</artifactId>
                <version>1.6.0-M1</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core -->
            <dependency>
                <groupId>com.datastax.cassandra</groupId>
                <artifactId>cassandra-driver-core</artifactId>
                <version>3.3.0</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-catalyst_2.10</artifactId>
                <version>2.2.0</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.10</artifactId>
                <version>2.2.0</version>
            </dependency>    
    
        </dependencies>
    
        <build>
            <resources>
                <resource>
                    <directory>${basedir}/src/main/resources</directory>
                </resource>
            </resources>
            <pluginManagement>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.6.2</version>
                        <configuration>
                            <source>1.8</source>
                            <target>1.8</target>
                        </configuration>
                    </plugin>
                </plugins>
            </pluginManagement>
        </build>
    </project>
    

    【讨论】:

      【解决方案4】:

      我能够成功。

      我的 scala 版本是 2.11.12

      下面是我的pom.xml

      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>com.sashi</groupId>
          <artifactId>SalesAnalysis</artifactId>
          <version>0.0.1-SNAPSHOT</version>
          <packaging>jar</packaging>
      
          <name>SalesAnalysis</name>
          <url>http://maven.apache.org</url>
      
          <properties>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          </properties>
      
          <dependencies>
              <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-core_2.11</artifactId>
                  <version>2.2.0</version>
                  <scope>provided</scope>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.11</artifactId>
                  <version>2.2.0</version>
              </dependency> 
      
              <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.11 -->
              <dependency>
                  <groupId>com.datastax.spark</groupId>
                  <artifactId>spark-cassandra-connector_2.11</artifactId>
                  <version>2.0.5</version>
              </dependency>               
      
      
              <dependency>
                  <groupId>com.datastax.cassandra</groupId>
                  <artifactId>cassandra-driver-core</artifactId>
                  <version>3.3.0</version>
              </dependency>
      
              <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-catalyst_2.11</artifactId>
                  <version>2.2.0</version>
              </dependency>
      
              <dependency>
                  <groupId>io.netty</groupId>
                  <artifactId>netty-all</artifactId>
                  <version>4.1.17.Final</version>
              </dependency>
      
          </dependencies>
      </project>
      

      这是我的 spark-submit 脚本:

      spark-submit --class com.sashi.SalesAnalysis.CassandraSparkSalesAnalysis --packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.0 /home/cloudera/Desktop/spark_ex/Cassandra/sales-analysis.jar
      

      【讨论】:

        猜你喜欢
        • 2015-08-16
        • 2023-04-07
        • 1970-01-01
        • 2017-08-06
        • 2017-01-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多