【问题标题】:Spark splitting a DStream into several RDDsSpark 将一个 DStream 拆分为多个 RDD
【发布时间】:2015-02-05 17:11:07
【问题描述】:

同样的问题也适用于将一个 RDD 拆分成几个新的 RDD。

一个 DStream 或 RDD 包含几个不同的案例类,我需要根据案例类类型将它们变成单独的 RDD。

我知道

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }

val newRDD = rdd.filter { 
  a => a match { 
    case _: CC1 => true
    case _ => false
  }
}

但这需要对原始 RDD 进行多次运行,每个案例类类型运行一次。

  1. 上面的匹配过滤器一定有更简洁的方法吗?
  2. 有没有办法通过一个并行通道将一个 rdd 按元素类型拆分为多个?

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    1) 一种更简洁的过滤给定类型的方法是使用rdd.collect(PartialFunction[T,U])

    相当于

    val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" }
    

    应该是:

    val newRDD = rdd.collect{case c:CaseClass1 => c}
    

    它甚至可以与额外的过滤和转换相结合:

    val budgetRDD = rdd.collect{case c:CaseClass1 if (c.customer == "important") => c.getBudget}
    

    rdd.collect(p:PartialFunction[T,U]) 不应与将数据返回给驱动程序的rdd.collect() 混淆。


    2) 要拆分 RDD(或 DStream),filter 是要走的路。必须记住,RDD 是一个分布式集合。过滤器可让您在集群上并行地将函数应用于该分布式集合的子集。

    从原始 RDD 结构创建 2 个或更多 RDD 将导致 1 对多 shuffle 阶段,这将大大增加成本。

    【讨论】:

    • 好的,但我真的不想收藏,对吗?我编辑了上面的内容以将返回的过滤器分配给 rdd。换句话说,我确实想要一个过滤器,对吧?
    • 我猜这是因为从闭包返回一个值比布尔值更容易?
    • 编辑了答案以显示该调用返回一个 RDD。正如我所提到的,rdd.collect() 和 rdd.collect{case x => x.y} 有两个非常不同的用途。
    【解决方案2】:

    看起来与rdd.filter 一样,我在长表单上走在了正确的轨道上。稍微简洁一点的版本是:

    val newRDD = rdd.filter { case _: CC1 => true ; case _ => false }
    

    您不能省略case _ => false,否则课程测试并不详尽,您会遇到错误。我无法让收集器正常工作。

    @maasg 获得了关于进行单独的过滤器通道而不是破解一种在一次通道中拆分输入的正确答案的正确答案。

    【讨论】:

      猜你喜欢
      • 2016-06-29
      • 1970-01-01
      • 1970-01-01
      • 2019-01-05
      • 1970-01-01
      • 2016-01-03
      相关资源
      最近更新 更多