【发布时间】:2017-10-30 16:42:39
【问题描述】:
我已经使用 JavaRDD 在 Spark 中重写了这段代码。我读到 groupByKey 是昂贵的操作。
我们可以通过避免 groupByKey 来重写它吗?
按键分组后,我正在尝试更新键的值(如果适用)。
有人可以帮忙吗
List<Items> items = getItems();
Map<String, List<ItemId>> itemsByName = items.stream()
.collect(Collectors.groupingBy(ItemId::getName, Collectors.toList()));
List<ItemId> newItems = itemsByName.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> {
//update values if applicable
List<ItemId> rps = e.getValue().stream().filter(s -> s.isApplicable()).collect(Collectors.toList());
return rps.isEmpty() ? e.getValue() : rps;
}))
.values().stream()
.flatMap(x -> x.stream()).collect(Collectors.toList());
JavaRDD
JavaRDD<Items> items = getItemsRDD();
JavaPairRDD<String, ItemId> itemsByName =
items.mapToPair(e -> new Tuple2<String, ItemId>(e.getName(), e));
JavaRDD<ItemId> newItems= itemsByName.groupByKey().mapValues(x->{
//update values if applicable
List<ItemId> e = new ArrayList<>();
x.iterator().forEachRemaining(e::add);
List<ItemId> rps = e.stream().filter(s -> s.isApplicable()).collect(Collectors.toList());
return rps.isEmpty() ? e: rps;
}).flatMap(x->x._2);
我正在尝试做一些类似但在 java 中的事情 How to update column based on a condition (a value in a group)?
【问题讨论】:
-
我不相信你不会通过不使用
groupByKey获得太多收益,因为你似乎没有执行某种聚合来大幅减少返回值的大小(我可能是错的,取决于isApplicable)。请参阅here 以获取类似问题的答案。
标签: java apache-spark