【发布时间】:2020-12-11 18:04:00
【问题描述】:
-
我正在尝试在我们的一些 spark 中使用 cassandra 作为键值查找存储 工作。
-
我们主要使用 Dataframes,并且已经远离了 RDD API。
-
不是加入表格,而是将它们加载到 spark 或
将join推送到cassandra并采取措施避免大
表扫描,我想我可以写一个连接的 Spark UDF cassandra a 查找一个键 -
我还想将结果行转换为案例类 对象并返回对象。
我根据下面这个问题的回答获得了一些信息。 withSessionDo 重用每个节点上可用的底层 JVM 级别会话 Spark Cassandra Connector proper usage
val connector = CassandraConnector(sparkConf) // I Know this is serializable.
def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
val result = session.execute( stmt.bind(key) )
MyCaseClass(
fieldl1 = result.getString(0),
fieldl2 = result.getInt(1)
...
)
}
})
Session 不可序列化,因此我们无法在 udf 之外创建一个并将其传递,因此我们可以使用映射管理器将行转换为案例类实例。 使用 Mapping Manager 的替代方法,
def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val manager = new MappingManager(session) // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
mapperClass.get(key)
}
})
我是 cassandra 的新手,所以请多多指教。
- 这些方法中是否有我不知道的陷阱?
- 在第二种方法中,我知道我们在每次调用 UDF 时都会创建一个新的 MappingManager(session)。这是否仍会使用 jvm 级会话并打开更多会话? 每次调用都实例化 MappingManager 是否正确?该会话不可序列化,因此我无法在外部创建它并将其传递给 UDF。
- 还有哪些其他方法可以将结果行转换为案例类的对象?
- 有没有更好的替代方法来进行这种查找?
【问题讨论】:
标签: apache-spark cassandra spark-cassandra-connector