【问题标题】:RDD of CassandraRow not working with take-command - why?CassandraRow 的 RDD 不能使用 take-command - 为什么?
【发布时间】:2020-05-10 02:23:37
【问题描述】:

我正在做一些 DataStax VM 的练习。

给出了一个 CassandraTable,我将使用 Spark API 函数而不是 cassandra-query-functions 进行一些过滤和检索前 5 个元素。

我正在执行以下操作:

val cassRdd = sc.cassandraTable("killr_video", "videos_by_year_title")
val cassRdd2 = cassRdd.filter(r=>r.getString("title") >= "T")
println("1" : + cassRdd2)
println("2" : + cassRdd2.count)
println("3" : + cassRdd2.take(5))
println("4" : + cassRdd2.take(5).count)

结果:

  • 1:MapPartitionsRDD[185] 位于过滤器:19
  • 2:2250
  • 3:[Lcom.datastax.spark.connector.CassandraRow;@56fd2e09
  • 4:编译错误(特征中缺少方法计数的参数 TraversableOnce

我的预期:

  • 1:和 2:按预期工作
  • 3:只返回一行?我希望 RDD 有 5 个 cassandra 行
  • 4:这不是 3: 之后的 rdd 计数,因此我没想到它会起作用, 看起来像是某种 cassandraRow-count-method 我不是 打算打电话

Datastax 提供的解决方案使用 RDD 并对其进行映射转换,仅获取标题,并在新的 title-rdd 上执行过滤和获取命令。

好的,工作,但我不明白,为什么 take 不能在 CassandraRow 的 RDD 上工作,或者结果可能是什么。

val cassRdd2 = cassRdd.map(r=>r.getString("title")).filter(t >= "T")

我认为任何 RDD(不管它的内容)上的 take-command 都会做同样的事情,获取前 x 个元素会产生一个新的 RDD,其类型完全相同,大小为 x 个元素。

【问题讨论】:

    标签: scala apache-spark spark-cassandra-connector


    【解决方案1】:

    rdd.take(n) 实际上将n 元素移动到驱动程序并将它们作为数组返回,请参阅ScalaDoc。如果你想打印它们:

    println("3" : + cassRdd2.take(5).toList)
    

    cassRdd2.take(5).foreach(println)。最后一行不起作用,因为该方法被称为数组的length(或size):

    println("4" : + cassRdd2.take(5).length)
    

    【讨论】:

    • 谢谢,这也回答了我自己的答案中的下一个问题(如下),操作结果是一个数组
    【解决方案2】:

    我弄混了一些东西:

    take 是一个动作,我不应该期待一个 RDD(但它是什么?一些二进制文件?它有名字吗?某种集合?也可能是一个单一的值,如 String 或 int 如果它适合)

    我不应该像在 RDD 上那样使用 count,而应该像在 java-collections 上那样使用 size。顺便说一句,count 也是一个动作,在一个动作之后使用一个动作听起来像转储,但它是如此直观。

    【讨论】:

      猜你喜欢
      • 2017-10-30
      • 1970-01-01
      • 1970-01-01
      • 2021-09-18
      • 2021-10-08
      • 1970-01-01
      • 2012-09-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多