【发布时间】:2020-05-10 02:23:37
【问题描述】:
我正在做一些 DataStax VM 的练习。
给出了一个 CassandraTable,我将使用 Spark API 函数而不是 cassandra-query-functions 进行一些过滤和检索前 5 个元素。
我正在执行以下操作:
val cassRdd = sc.cassandraTable("killr_video", "videos_by_year_title")
val cassRdd2 = cassRdd.filter(r=>r.getString("title") >= "T")
println("1" : + cassRdd2)
println("2" : + cassRdd2.count)
println("3" : + cassRdd2.take(5))
println("4" : + cassRdd2.take(5).count)
结果:
- 1:MapPartitionsRDD[185] 位于过滤器:19
- 2:2250
- 3:[Lcom.datastax.spark.connector.CassandraRow;@56fd2e09
- 4:编译错误(特征中缺少方法计数的参数 TraversableOnce
我的预期:
- 1:和 2:按预期工作
- 3:只返回一行?我希望 RDD 有 5 个 cassandra 行
- 4:这不是 3: 之后的 rdd 计数,因此我没想到它会起作用, 看起来像是某种 cassandraRow-count-method 我不是 打算打电话
Datastax 提供的解决方案使用 RDD 并对其进行映射转换,仅获取标题,并在新的 title-rdd 上执行过滤和获取命令。
好的,工作,但我不明白,为什么 take 不能在 CassandraRow 的 RDD 上工作,或者结果可能是什么。
val cassRdd2 = cassRdd.map(r=>r.getString("title")).filter(t >= "T")
我认为任何 RDD(不管它的内容)上的 take-command 都会做同样的事情,获取前 x 个元素会产生一个新的 RDD,其类型完全相同,大小为 x 个元素。
【问题讨论】:
标签: scala apache-spark spark-cassandra-connector