【问题标题】:spark - filter within mapspark - 地图内的过滤器
【发布时间】:2022-02-07 22:51:57
【问题描述】:

我正在尝试过滤地图功能。基本上,我在经典 map-reduce 中这样做的方式是,当过滤条件满足时,mapper 不会将任何内容写入上下文。我怎样才能用火花实现类似的效果?我似乎无法从 map 函数返回 null,因为它在 shuffle 步骤中失败。我可以使用过滤器功能,但似乎没有必要对数据集进行迭代,而我可以在地图期间执行相同的任务。我也可以尝试使用虚拟键输出 null,但这是一个糟糕的解决方法。

【问题讨论】:

  • 您能添加说明问题的示例代码吗?

标签: java apache-spark


【解决方案1】:

有几个选项:

rdd.flatMap: rdd.flatMap 会将Traversable 集合展平到 RDD 中。要选择元素,您通常会返回一个 Option 作为转换的结果。

rdd.flatMap(elem => if (filter(elem)) Some(f(elem)) else None)

rdd.collect(pf: PartialFunction) 允许您提供一个部分函数,​​可以过滤和转换原始 RDD 中的元素。您可以使用此方法使用模式匹配的所有功能。

rdd.collect{case t if (cond(t)) => f(t)}
rdd.collect{case t:GivenType => f(t)}

正如 Dean Wampler 在 cmets 中提到的那样,rdd.map(f(_)).filter(cond(_)) 可能与上述其他更“简洁”的选项一样好,甚至更快。

f 是一个转换(或映射)函数。

【讨论】:

  • 如果您使用...filter().map(),它们将在每个分区的同一任务中执行,类似于 MapReduce 中的链接“映射器”。这甚至可能比单个 flatMapcollect 更快,具体取决于分配了多少临时对象,然后快速收集垃圾。
  • @DeanWampler 我知道流水线,但很高兴知道filter().map() 可能比flatmapcollect... 我们用collect() 替换了许多map().filter() b/ c 读起来更好,但需要检查性能。谢谢。
  • 谢谢。现在,“地图链”方式并没有阻碍我。稍后我会在研究 perf 时研究 Java 8 中的等效平面图建议
  • @maasg flatmap 在代码简单性和性能方面都证明对我有用。我也删除了 spark sql 层,并使用 flatmap 函数进行过滤和映射。
  • @maasg - 我可能错了,但看看 flatMap 源代码,flatMap 似乎是一次迭代,而 filter.map 似乎是每个分区的两次迭代 - def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] = mapPartitions(_.flatMap(func))
【解决方案2】:
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-14
  • 2013-06-01
  • 2021-01-12
  • 2022-01-23
  • 2022-01-08
  • 2014-11-01
相关资源
最近更新 更多