【问题标题】:How to replicate my for loop using "map" with Spark?如何使用带有 Spark 的“map”复制我的 for 循环?
【发布时间】:2018-01-05 19:47:22
【问题描述】:

我正在尝试在 Spark 和 Scala 中实现这个逻辑。初始逻辑是用Java指定的(希望不是什么大问题:))。

Map<Object,List<Integer>> myMap = new HashMap<Object,List<Integer>>();

for (int i=0; i<len; i++) {
   String module = nodes.getAttribute(i);
   Integer k_i = nodes.getK(i);
   if (!myMap.containsKey(module)) {
      List<Integer> list = new ArrayList<Integer>();
      list.add(k_i);
      myMap.put(module,list);
   }
   else {
      List<Integer> list = myMap.get(module);
      list.add(k_i);
      myMap.put(module,list);
   }
}

我有nodesRDDRDD[Node],但我不知道如何在每次迭代中检查其内容时增量创建myMap。 任何提示都会非常有价值。

val myMap = nodes.map( node => {
  val module = node.getAttribute()
  (module, node)
})
.groupBy(_._1)

这给了我RDD[(Long, Iterable[(Long,Array(Node))])]。但我需要RDD[(Long, Array(Node))]

【问题讨论】:

  • myMap 是在哪里定义的?它不在 map() 的闭包中,因此任何 Spark Executor 都无法立即使用它
  • map 不能替代 for 循环。您可以修改 nodes 内容为新的RDD
  • @cricket_007:但是我不应该通过使用map 循环来修改nodes 吗? foreach 不会返回任何值
  • 那么,投票者中没有人聪明到足以做出任何暗示吗?
  • 我认为问题在于第一个代码 sn-p 是在 Java 中,而第二个是在 Scala 中。您应该澄清您需要哪种语言的帮助。

标签: scala apache-spark rdd


【解决方案1】:

这给了我 RDD[(Long, Iterable[(Long,Array(Node))])]

这是不可能的。如果

nodesRDDRDD[Node]

getAttributeT 类型,那么:

nodes.map( node => {
  val module = node.getAttribute()
  (module, node)
})

应该是RDD[(T, Node)].groupBy(_._1) 应该是RDD[(T, Iterable[(T, Node)])]

但我需要 RDD[(Long, Array(Node))]。

然后

myMap.mapValues(_.map(_._2).toArray)

当然:

nodes.groupBy(_.getAttribute()).mapValues(_.toArray)

nodes.map( node => {
  (node.getAttribute(), node)
}).groupByKey

更简单

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-04-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多