【问题标题】:Spark with Cassandra input/output带有 Cassandra 输入/输出的 Spark
【发布时间】:2014-08-18 12:32:20
【问题描述】:

描绘以下情景:Spark 应用程序(Java 实现)正在使用 Cassandra 数据库来加载、转换为 RDD 并处理数据。此外,该应用程序正在从数据库中传输新数据,这些数据也由自定义接收器处理。流式处理的输出存储在数据库中。该实现使用来自与数据库集成的 Spring Data Cassandra。

Cassandra 配置:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

DataProcessor.main 方法:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

预计初始加载会有大量数据。出于这个原因,数据在 rddBuffer 中被分页、加载和分发。

还有以下可用选项:

  1. Spark-Cassandra 示例 (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala),尽管此示例的文档数量最少。
  2. Calliope 项目 (http://tuplejump.github.io/calliope/)

我想知道将 Spark 与 Cassandra 集成的最佳实践是什么。在我的实施中遵循的最佳选择是什么?

Apache Spark 1.0.0、Apache Cassandra 2.0.8

【问题讨论】:

  • 您打算在单节点还是集群上使用 Spark?
  • ps:考虑使用 Scala for Spark。好多了。
  • 这是一个原型实现,在下一阶段,所有代码都将使用 Java 8(lambda 表达式)或 Scala(闭包)编写。语言的选择将取决于几个因素。此外,在集群中,在单个节点上使用 Skark 是没有意义的。
  • @maasg 是正确的,他的回答也很好。相信我们,当你去 Scala 时,你的生活会轻松 10 倍。我们有一句谚语“一旦你去了 Scala,你就再也回不去了”。
  • @samthebest 与以前版本的 Java 或 Python 实现相比,Scala 和 Java 8 似乎都提供了巨大的优势。 Scala 似乎很适合 Spark,因为它是 Scala 实现。但是,这两者之间的选择将在稍后阶段进行评估。目前我的意图是专注于基本面。

标签: java cassandra apache-spark spring-data-cassandra


【解决方案1】:

使用 Cassandra 和 Spark 最简单的方法是使用 DataStax 为 Spark 开发的官方开源 Cassandra 驱动程序:https://github.com/datastax/spark-cassandra-connector

此驱动程序构建在 Cassandra Java 驱动程序之上,并提供了 Cassandra 和 Spark 之间的直接桥梁。与 Calliope 不同,它不使用 Hadoop 接口。此外,它还提供以下独特功能:

  • 支持所有 Cassandra 数据类型,包括集合,开箱即用
  • Cassandra 行到自定义类或元组的轻量级映射,无需使用 Scala 中的任何隐式或其他高级功能
  • 将所有 RDD 保存到 Cassandra
  • 完全支持 Cassandra 虚拟节点
  • 能够在服务器端过滤/选择,例如利用 Cassandra 集群列或二级索引

【讨论】:

  • 谢谢,这正是我的实施所需要的。
  • 看看 spark-cassandra-connector 提供的 CassandraConnector 类。它使在 RDD(foreach、map 等)上的操作内部运行 CQL 查询等变得容易,因此它们分布在整个集群中。在 IMO 文档中没有足够强调。
  • Piotr,你能看看这个关于datastax'sspark-cassandra-connector的问题吗? stackoverflow.com/questions/27130321/… 谢谢。
【解决方案2】:

上面代码中的方法是一种经典的集中式算法,只有在一个节点中执行才能工作。 Cassandra 和 Spark 都是分布式系统,因此有必要对流程进行建模,使其可以分布在多个节点之间。

有几种可能的方法: 如果您知道要获取的行的键,则可以执行以下简单操作:(使用 DataStax Java 驱动程序)

val data = sparkContext.parallelize(keys).map{key => 
   val cluster = val cluster = Cluster.builder.addContactPoint(host).build()
   val session  = cluster.connect(keyspace)
   val statement = session.prepare("...cql...);")
   val boundStatement = new BoundStatement(sttmt)
   session.execute(session.execute(boundStatement.bind(...data...)
}

这将有效地在 Spark 集群中分配键的获取。请注意与 C* 的连接是如何在闭包中完成的,因为这样可以确保在每个单独的分布式工作线程上执行任务时建立连接。

鉴于您的示例使用通配符(即密钥未知),使用 Cassandra 的 Hadoop 接口是一个不错的选择。问题中链接的 Spark-Cassandra 示例说明了在 Cassandra 上使用此 Hadoop 接口。

Calliope 是一个库,它通过提供一个简单的 API 来访问该功能,从而封装了使用 Hadoop 接口的复杂性。它仅在 Scala 中可用,因为它使用特定的 Scala 功能(如即将发布的版本中的隐式和宏) 使用 Calliope,您基本上可以声明如何将您的 RDD[type] 转换为行键和行值,并且 Calliope 负责将 hadoop 接口配置为作业。 我们发现 Calliope(以及底层的 hadoop 接口)比使用驱动程序与 Cassandra 交互快 2-4 倍。

结论:我会放弃 Spring-Data 配置来访问 Cassandra,因为这会将您限制在单个节点上。如果可能,考虑一个简单的并行访问,或者在 Scala 中使用 Calliope 进行探索。

【讨论】:

  • 感谢您的回答 maasg。您明确了 Hadoop 接口和 Calliope API 的两个选项之间的区别。但尚不清楚为什么 Spring Data Cassandra 选项在集群架构上不是一个有效的选项。您能否提供更多详细信息?在给定的示例代码上,数据在本地加载并分批分布在集群上。为此目的使用了一个 RDD 缓冲区(每个批次都有联合)。是不是每个worker节点都会在RDD中并行加载同一个数据集?
  • 您不应在应用程序的紧密循环中构建新集群。这会扼杀性能。此外,您需要在使用后正确关闭集群实例。否则你很快就会耗尽内存和/或线程。 Cluster 和 Session 实例是线程安全的。可以将它们移出 lambda 并共享,但是,由于它们不可序列化,因此无法分发它们。官方 cassandra-driver-spark 模块中的 Connector 类已经解决了这个问题。
  • @PiotrKolaczkowski 鉴于集群不可序列化,它需要在 lambda 中实例化,因为该代码将由分布式 Spark 系统上的每个工作人员执行。您提到的情况仅适用于本地模式。回复:cassandra-spark:你肯定接近源头。我今天早上在 Spark 峰会上发表了公告。将尽快调查。
  • 你是对的,这就是为什么我提到了驱动程序用来解决这个问题的 CassandraConnector 类。 CassandraConnector 可以被视为与 Cassandra 的可序列化连接。它允许在同一 JVM 上的线程之间共享集群,但是当通过网络发送到远程节点时,它会透明地重新建立与集群的连接。它还提供了一些不错的实用程序来避免资源泄漏。
猜你喜欢
  • 2014-08-10
  • 1970-01-01
  • 1970-01-01
  • 2019-07-28
  • 2018-08-18
  • 1970-01-01
  • 2015-02-03
  • 2020-02-19
  • 2019-12-09
相关资源
最近更新 更多