【发布时间】:2017-07-22 09:18:29
【问题描述】:
我一直在与 sparklyr 合作,将大型 cassandra 表带入 spark,用 R 注册这些表并对它们执行 dplyr 操作。
我已成功导入 cassandra 表,代码如下所示:
# import cassandra table into spark
cass_df <- sparklyr:::spark_data_read_generic(
sc, "org.apache.spark.sql.cassandra", "format",
list(keyspace = "cass_keyspace", table = "cass_table")
) %>%
invoke("load")
# register table in R
cass_tbl <- sparklyr:::spark_partition_register_df(
sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
)
其中一些 cassandra 表非常大(> 85 亿行)并且需要一段时间来导入/注册,有些会导致内存溢出,即使有 6 个节点运行总共 60 个内核和 192gb RAM。但是,我通常只需要每个 cassandra 数据库中的一些列。
我的问题是:
- 是否可以在导入/注册时过滤 cassandra 数据库,使其仅导入某些列或在主键上进行过滤(即通过传递
SQL/CQL类型查询,例如SELECT name FROM cass_table WHERE id = 5)? - 这样的查询在上面的代码中会去哪里,语法采用什么形式?
我尝试在选项列表中添加这样一个查询作为附加选项,即:
list(. . . , select = "id")
以及在%>% invoke("load")之前将其作为单独的管道调用,即:
invoke("option", "select", "id") %>%
# OR
invoke("option", "query", s"select id from cass_table") %>%
但是这些不起作用。有什么建议吗?
【问题讨论】:
标签: r apache-spark cassandra cql sparklyr