【发布时间】:2019-06-28 14:22:08
【问题描述】:
当我们如下使用spark从csv中读取数据时,它会自动将数据拆分到多个分区并发送给executors
spark
.read
.option("delimiter", ",")
.option("header", "true")
.option("mergeSchema", "true")
.option("codec", properties.getProperty("sparkCodeC"))
.format(properties.getProperty("fileFormat"))
.load(inputFile)
目前,我有一个 id 列表:
[1,2,3,4,5,6,7,8,9,...1000]
我想要做的是将这个列表拆分为多个分区并发送到执行器,在每个执行器中,运行 sql as
ids.foreach(id => {
select * from table where id = id
})
当我们从 cassandra 加载数据时,连接器将生成查询 sql 为:
select columns from table where Token(k) >= ? and Token(k) <= ?
这意味着,连接器将扫描整个数据库,实际上,我不需要扫描整个表,我只是从 id 列表中 k(分区键)所在的表中获取所有数据。
表架构为:
CREATE TABLE IF NOT EXISTS tab.events (
k int,
o text,
event text
PRIMARY KEY (k,o)
);
或者我如何使用 spark 使用预定义的 sql 语句从 cassandra 加载数据而不扫描整个表?
【问题讨论】:
标签: scala apache-spark cassandra spark-cassandra-connector