【问题标题】:An example in Java using Embedded Cassandra Server to test a Cassandra-Spark job使用嵌入式 Cassandra 服务器测试 Cassandra-Spark 作业的 Java 示例
【发布时间】:2016-08-29 20:35:49
【问题描述】:

我是 Cassandra 和 Spark 的新手。我正在尝试为我的 Spark 作业设置测试,它执行以下操作:

  1. 将表 A 中的数据加载到 DataFrames 中
  2. 对这些 DataFrame 进行一些过滤、分组和聚合
  3. 将结果加载到表 B 中

我想使用嵌入式 Cassandra 服务器来运行测试,而不是让它连接到 Cassandra 数据库的本地实例。有没有人这样做过?如果是这样,有人可以指点我一个很好的例子吗?提前感谢您的帮助!

【问题讨论】:

标签: java apache-spark cassandra


【解决方案1】:
this code does

package cassspark.clt;

import java.io.*;
import javafx.application.Application;
import java.util.concurrent.Executors ;
import java.util.concurrent.ExecutorService;
import org.apache.cassandra.service.CassandraDaemon;
import com.datastax.driver.core.exceptions.ConnectionException;
import java.util.Properties;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.sql.SparkSession;

public class EmbeddedCassandraDemo extends Application {

    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private CassandraDaemon cassandraDaemon;

    public EmbeddedCassandraDemo() {
    }

    public static void main(String[] args) {
        try {
            new EmbeddedCassandraDemo().run();
        }
        catch(java.lang.InterruptedException e)
        {
            ;
        }
    }

    @Override public void start(javafx.stage.Stage stage) throws Exception
    {
        stage.show();
    }

    private void run() throws InterruptedException, ConnectionException {
        setProperties();
        activateDeamon();
    }

    private void activateDeamon() {
        executor.execute( new Runnable() {

            @Override
            public void run() {
                cassandraDaemon = new CassandraDaemon();
                cassandraDaemon.activate();
                SparkSession spark = SparkSession .builder().master("local").appName("ASH").getOrCreate();
            }
        });
    }

    private void setProperties() {

        final String yaml = System.getProperty("user.dir") + File.separator +"conf"+File.separator+"cassandra.yaml";
        final String storage = System.getProperty("user.dir") + File.separator +"storage" + File.separator +"data";

        System.setProperty("cassandra.config", "file:"+ yaml );
        System.setProperty("cassandra.storagedir", storage );
        System.setProperty("cassandra-foreground", "true");

        String log4JPropertyFile = "./conf/log4j.properties";
        Properties p = new Properties();
        try {
            p.load(new FileInputStream(log4JPropertyFile));
            PropertyConfigurator.configure(p);
        } catch (IOException e) {
            System.err.println("./conf/log4j.properties not found ");
            System.exit(1);
            ;
        }
    }
}

【讨论】:

  • 我在这里看不到您是如何执行 Spark 作业的,您能否详细说明您如何向 Cassandra-unit 提交 Spark 作业,谢谢
猜你喜欢
  • 2011-09-30
  • 1970-01-01
  • 1970-01-01
  • 2015-08-31
  • 2015-08-26
  • 2018-05-21
  • 2017-07-07
  • 1970-01-01
  • 2021-07-30
相关资源
最近更新 更多