【问题标题】:how to split a list to multiple partitions and sent to executors如何将列表拆分为多个分区并发送给执行程序
【发布时间】:2019-06-28 14:22:08
【问题描述】:

当我们如下使用spark从csv中读取数据时,它会自动将数据拆分到多个分区并发送给executors

spark
  .read
  .option("delimiter", ",")
  .option("header", "true")
  .option("mergeSchema", "true")
  .option("codec", properties.getProperty("sparkCodeC"))
  .format(properties.getProperty("fileFormat"))
  .load(inputFile)

目前,我有一个 id 列表:

[1,2,3,4,5,6,7,8,9,...1000]

我想要做的是将这个列表拆分为多个分区并发送到执行器,在每个执行器中,运行 sql as

ids.foreach(id => {    
select * from table where id = id
})

当我们从 cassandra 加载数据时,连接器将生成查询 sql 为:

select columns from table where Token(k) >= ? and Token(k) <= ? 

这意味着,连接器将扫描整个数据库,实际上,我不需要扫描整个表,我只是从 id 列表中 k(分区键)所在的表中获取所有数据。

表架构为:

CREATE TABLE IF NOT EXISTS tab.events (
    k int,
    o text,
    event text
    PRIMARY KEY (k,o)
);

或者我如何使用 spark 使用预定义的 sql 语句从 cassandra 加载数据而不扫描整个表?

【问题讨论】:

    标签: scala apache-spark cassandra spark-cassandra-connector


    【解决方案1】:

    您只需要使用joinWithCassandra function 来选择您的操作所需的数据。但请注意,此功能只能通过 RDD API 使用。

    类似这样的:

    val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")
    

    您需要确保 DataFrame 中的列名称与 Cassandra 中的分区键名称匹配 - 请参阅文档了解更多信息。

    DataFrame 实现仅在 DSE 版本的 Spark Cassandra 连接器中可用,如 following blog post 中所述。

    2020 年 9 月更新:Spark Cassandra Connector 2.5.0 中添加了对加入 Cassandra 的支持

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-06-02
      • 2017-05-15
      • 1970-01-01
      • 2016-04-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多