【发布时间】:2014-08-18 12:32:20
【问题描述】:
描绘以下情景:Spark 应用程序(Java 实现)正在使用 Cassandra 数据库来加载、转换为 RDD 并处理数据。此外,该应用程序正在从数据库中传输新数据,这些数据也由自定义接收器处理。流式处理的输出存储在数据库中。该实现使用来自与数据库集成的 Spring Data Cassandra。
Cassandra 配置:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception {
return new CassandraTemplate(session().getObject());
}
}
DataProcessor.main 方法:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
Event lastEvent = pagingResults.get(pagingResults.size() - 1);
pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize page and add to the existing
rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
预计初始加载会有大量数据。出于这个原因,数据在 rddBuffer 中被分页、加载和分发。
还有以下可用选项:
- Spark-Cassandra 示例 (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala),尽管此示例的文档数量最少。
- Calliope 项目 (http://tuplejump.github.io/calliope/)
我想知道将 Spark 与 Cassandra 集成的最佳实践是什么。在我的实施中遵循的最佳选择是什么?
Apache Spark 1.0.0、Apache Cassandra 2.0.8
【问题讨论】:
-
您打算在单节点还是集群上使用 Spark?
-
ps:考虑使用 Scala for Spark。好多了。
-
这是一个原型实现,在下一阶段,所有代码都将使用 Java 8(lambda 表达式)或 Scala(闭包)编写。语言的选择将取决于几个因素。此外,在集群中,在单个节点上使用 Skark 是没有意义的。
-
@maasg 是正确的,他的回答也很好。相信我们,当你去 Scala 时,你的生活会轻松 10 倍。我们有一句谚语“一旦你去了 Scala,你就再也回不去了”。
-
@samthebest 与以前版本的 Java 或 Python 实现相比,Scala 和 Java 8 似乎都提供了巨大的优势。 Scala 似乎很适合 Spark,因为它是 Scala 实现。但是,这两者之间的选择将在稍后阶段进行评估。目前我的意图是专注于基本面。
标签: java cassandra apache-spark spring-data-cassandra