【问题标题】:Spark UDF To Look up Keys Using Cassandra ConnectorSpark UDF 使用 Cassandra 连接器查找密钥
【发布时间】:2020-12-11 18:04:00
【问题描述】:
  • 我正在尝试在我们的一些 spark 中使用 cassandra 作为键值查找存储 工作。

  • 我们主要使用 Dataframes,并且已经远离了 RDD API。

  • 不是加入表格,而是将它们加载到 spark 或
    将join推送到cassandra并采取措施避免大
    表扫描,我想我可以写一个连接的 Spark UDF cassandra a 查找一个键

  • 我还想将结果行转换为案例类 对象并返回对象。

我根据下面这个问题的回答获得了一些信息。 withSessionDo 重用每个节点上可用的底层 JVM 级别会话 Spark Cassandra Connector proper usage

val connector = CassandraConnector(sparkConf) // I Know this is serializable.

def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
        val result = session.execute( stmt.bind(key) )
        MyCaseClass(
           fieldl1 = result.getString(0),
           fieldl2 = result.getInt(1)
           ...
        )
    }
})

Session 不可序列化,因此我们无法在 udf 之外创建一个并将其传递,因此我们可以使用映射管理器将行转换为案例类实例。 使用 Mapping Manager 的替代方法,

def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val manager = new MappingManager(session)   // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
        val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
        mapperClass.get(key)
    }
})

我是 cassandra 的新手,所以请多多指教。

  1. 这些方法中是否有我不知道的陷阱?
  2. 在第二种方法中,我知道我们在每次调用 UDF 时都会创建一个新的 MappingManager(session)。这是否仍会使用 jvm 级会话并打开更多会话? 每次调用都实例化 MappingManager 是否正确?该会话不可序列化,因此我无法在外部创建它并将其传递给 UDF。
  3. 还有哪些其他方法可以将结果行转换为案例类的对象?
  4. 有没有更好的替代方法来进行这种查找?

【问题讨论】:

    标签: apache-spark cassandra spark-cassandra-connector


    【解决方案1】:

    您正在尝试模拟 Spark Cassandra 连接器 (SCC) 在后台执行的操作,但您的实现会比 SCC 慢得多,因为您使用的是同步 API,并且一个接一个地获取所有数据,而 SCC 是使用异步 API,并行拉取多行数据。

    实现您想要的最佳方式是使用 Cassandra 优化连接(通常称为“直接连接”)。这种 join 一直可用于 RDD API,但很长一段时间以来,Dataframe API 仅在连接器的商业版本中可用。但从 SCC 2.5.0 (released in May 2020th) 开始,此功能在开源版本中也可用,因此您可以使用它而不是构建它的仿真。直接加入仅在您enable special Catalyst extensions 时执行,通过在配置 SparkSession 时传递spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions(例如通过命令行)。之后,您可以通过完整或部分主键与 Cassandra 表执行联接,SCC 会自动将联接转换为对 Cassandra 执行非常有效的单个请求。您可以通过在连接的数据帧上执行 explain 来检查是否发生这种情况,因此您应该会看到类似这样的内容(查找字符串 Cassandra Direct Join):

    scala> joined.explain
    == Physical Plan ==
    Cassandra Direct Join [pk = id#30, c1 = cc1#32] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
    +- *(1) Project [cast(id#28L as int) AS id#30, cast(id#28L as int) AS cc1#32]
       +- *(1) Range (1, 5, step=1, splits=8)
    

    我最近wrote a long blog post 解释了如何使用 Dataframe 和 RDD API 在 Cassandra 中执行有效的数据连接 - 我不想在这里重复 :-)

    【讨论】:

    • 谢谢,这对我有很大帮助事实上这是我的确切用例。我会用这个。不幸的是,我们有一个连接器的内部分支,它抽象了尚未从 2.5.0 分支的连接。我会尝试推动内部分叉升级到 2.5.0。但是直接使用连接器我能够看到这一点。我赞成你的回答
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-28
    • 2018-06-10
    • 2014-12-19
    • 1970-01-01
    • 2020-11-24
    相关资源
    最近更新 更多