【问题标题】:Importing cassandra table into spark via sparklyr - possible to select only some columns?通过 sparklyr 将 cassandra 表导入 spark - 可以只选择某些列吗?
【发布时间】:2017-07-22 09:18:29
【问题描述】:

我一直在与 sparklyr 合作,将大型 cassandra 表带入 spark,用 R 注册这些表并对它们执行 dplyr 操作。

我已成功导入 cassandra 表,代码如下所示:

# import cassandra table into spark

cass_df <- sparklyr:::spark_data_read_generic(
  sc, "org.apache.spark.sql.cassandra", "format", 
  list(keyspace = "cass_keyspace", table = "cass_table")
  ) %>% 
  invoke("load")


# register table in R

cass_tbl <- sparklyr:::spark_partition_register_df(
         sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
       )

其中一些 cassandra 表非常大(> 85 亿行)并且需要一段时间来导入/注册,有些会导致内存溢出,即使有 6 个节点运行总共 60 个内核和 192gb RAM。但是,我通常只需要每个 cassandra 数据库中的一些列。

我的问题是:

  1. 是否可以在导入/注册时过滤 cassandra 数据库,使其仅导入某些列或在主键上进行过滤(即通过传递 SQL / CQL 类型查询,例如 SELECT name FROM cass_table WHERE id = 5 )?
  2. 这样的查询在上面的代码中会去哪里,语法采用什么形式?

我尝试在选项列表中添加这样一个查询作为附加选项,即:

list(. . . , select = "id")

以及在%&gt;% invoke("load")之前将其作为单独的管道调用,即:

invoke("option", "select", "id") %>%

# OR

invoke("option", "query", s"select id from cass_table") %>%

但是这些不起作用。有什么建议吗?

【问题讨论】:

    标签: r apache-spark cassandra cql sparklyr


    【解决方案1】:

    您可以跳过急切缓存并选择感兴趣的列:

    session <- spark_session(sc)
    
    # Some columns to select
    cols <- list("x", "y", "z")
    
    cass_df <- session %>% 
      invoke("read") %>% 
      invoke("format", "org.apache.spark.sql.cassandra") %>% 
      invoke("options", as.environment(list(keyspace="test"))) %>% 
      invoke("load") %>% 
      # We use select(col: String, cols* String) so the first column
      # has to be used separately. If you want only one column the third argument
      # has to be an empty list 
      invoke("select", cols[[1]], cols[2:length(cols)]) %>%
      # Standard lazy cache if you need one
      invoke("cache")
    

    如果您使用可以显着减少获取数据集数量的谓词pushdown 选项为"true"(默认)并使用filter before缓存。

    如果您想传递更复杂的查询,您可以注册临时视图和sql 方法:

    session %>%
      invoke("read") %>% 
      ...
      invoke("load") %>% 
      invoke("createOrReplaceTempView", "some_name")
    
    cass_df <- session %>% 
      invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
      invoke("cache")
    

    【讨论】:

    • 太棒了,这篇文章对我帮助很大...我做了一些受此启发的事情,但从 CSV 文件中加载了正确的列。我想添加一个可能想要注册case_df 以便可以在其上使用 dplyr 动词(因为 sparklyr 带有 dplyr 后端)。注册完成:R_cass_df = sdf_register(cass_df, "spark_cass_df") 然后可以应用 dplyr 动词,例如:library("dplyr"); R_cass_df %&gt;% filter(foo == "bar") %&gt;% select(id)
    • @zero323 如何使用主机、用户名和密码连接到远程 Cassandra?
    猜你喜欢
    • 1970-01-01
    • 2015-12-15
    • 1970-01-01
    • 2022-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-21
    • 2019-11-01
    相关资源
    最近更新 更多