【发布时间】:2016-09-02 11:02:35
【问题描述】:
我使用的是 Spark 1.3.1,我写了一个小程序来过滤 cassandra 上的数据
val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
println(rdd2.count())
sc.stop()
这个程序运行了很长时间,打印类似
的消息16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46)
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350)
如果我终止程序并将代码更改为
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
它仍然运行很长时间,并带有类似的消息
6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350)
所以看起来程序总是会尝试将整个 cassandra 表加载到内存中(或尝试完全扫描它),然后才应用过滤器。这对我来说似乎效率极低。
如何以更好的方式编写此代码,以便 spark 不会尝试将整个 cassandra 表加载(或完全扫描)到 RDD 中,然后才应用过滤器?
【问题讨论】:
标签: scala apache-spark cassandra