【发布时间】:2018-01-05 19:47:22
【问题描述】:
我正在尝试在 Spark 和 Scala 中实现这个逻辑。初始逻辑是用Java指定的(希望不是什么大问题:))。
Map<Object,List<Integer>> myMap = new HashMap<Object,List<Integer>>();
for (int i=0; i<len; i++) {
String module = nodes.getAttribute(i);
Integer k_i = nodes.getK(i);
if (!myMap.containsKey(module)) {
List<Integer> list = new ArrayList<Integer>();
list.add(k_i);
myMap.put(module,list);
}
else {
List<Integer> list = myMap.get(module);
list.add(k_i);
myMap.put(module,list);
}
}
我有nodesRDD 即RDD[Node],但我不知道如何在每次迭代中检查其内容时增量创建myMap。
任何提示都会非常有价值。
val myMap = nodes.map( node => {
val module = node.getAttribute()
(module, node)
})
.groupBy(_._1)
这给了我RDD[(Long, Iterable[(Long,Array(Node))])]。但我需要RDD[(Long, Array(Node))]。
【问题讨论】:
-
myMap是在哪里定义的?它不在map()的闭包中,因此任何 Spark Executor 都无法立即使用它 -
map不能替代 for 循环。您可以修改nodes内容为新的RDD -
@cricket_007:但是我不应该通过使用
map循环来修改nodes吗?foreach不会返回任何值 -
那么,投票者中没有人聪明到足以做出任何暗示吗?
-
我认为问题在于第一个代码 sn-p 是在 Java 中,而第二个是在 Scala 中。您应该澄清您需要哪种语言的帮助。
标签: scala apache-spark rdd