【问题标题】:Spark Cassandra Connector proper usageSpark Cassandra 连接器正确使用
【发布时间】:2014-12-19 01:11:36
【问题描述】:

我希望将 spark 用于某些 ETL,它主要由“更新”语句组成(列是一个集合,它将被附加到,因此简单的插入可能不起作用)。因此,发出 CQL 查询来导入数据似乎是最好的选择。使用 Spark Cassandra 连接器,我知道我可以做到这一点:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra

现在我不想为源中的每一行打开一个会话并关闭它(我不想要这个是对的吗?通常情况下,我在整个过程中都有一个会话,并继续在“正常”中使用它应用)。但是,它说连接器是可序列化的,但会话显然不是。因此,将整个导入包装在单个“withSessionDo”中似乎会导致问题。我正在考虑使用这样的东西:

class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
    //session.execute cql goes here
  }
}

这是一个好方法吗?我需要担心关闭会话吗?我将在哪里/如何最好地做到这一点?任何指针表示赞赏。

【问题讨论】:

  • 我想知道为什么您可能不只是想创建 Spark Conf 对象和在 Spark 上下文中的引用,例如示例,在您引用的连接器页面上进一步显示?您应该能够只创建 conf 对象,然后在运行查询时保持上下文打开。

标签: cassandra apache-spark datastax


【解决方案1】:

您实际上确实想使用withSessionDo,因为它实际上不会在每次访问时打开和关闭会话。在后台,withSessionDo 访问 JVM 级别的会话。这意味着每个集群配置每个节点只有一个会话对象。

这意味着类似代码

val connector = CassandraConnector(sc.getConf)
sc.parallelize(1 to 10000000L).map(connector.withSessionDo( Session => stuff)

无论每台机器有多少核心,在每个执行程序 JVM 上只会创建 1 个集群和会话对象。

为了提高效率,我仍然建议使用 mapPartitions 来最小化缓存检查。

sc.parallelize(1 to 10000000L)
  .mapPartitions(it => connector.withSessionDo( session => 
      it.map( row => do stuff here )))

此外,会话对象还使用准备缓存,它允许您在序列化代码中缓存准备好的语句,并且每个 jvm 只会准备一次(所有其他调用都将返回缓存引用。)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-10-28
    • 2018-06-10
    • 1970-01-01
    • 2015-05-24
    • 2015-05-19
    • 2023-03-10
    • 2016-09-02
    相关资源
    最近更新 更多