【问题标题】:Count elements from Stream but consider only N for collecting计算 Stream 中的元素,但只考虑 N 来收集
【发布时间】:2018-10-09 17:31:59
【问题描述】:

以下 lambda 在 Java 中是否可能以某种方式出现?我想计算过滤流中的元素,但同时存储前 10 个

stream().filter(myFilter)  //Reduces input to forthcoming operations
        .limit(10)         //Limits to ten the amount of elements to finish stream 
        .peek(myList::add) //Stores the ten elements into a list
        .count();          //Here is the difficult one. Id like to count everything  the total of elements that pass the filter, beyond the 10 I am fetching

编辑:在我看来这太含蓄了,但这个想法当然是一种潜在的解决方案,它会是最快的(比调用两次流生成器并分别在最少):

List<Entity> entities = stream().filter(myFilter) 
                                .limit(10)
                                .collect(Collectors.toList());
long entitiesCount = stream().filter(myFilter) 
                             .count();

...从一次迭代中获利,而不必将整个集合加载到内存中。我正在对答案进行并行化测试

【问题讨论】:

  • 您可以使用 2 个单独的流来实现。
  • 这是一个流,因为它是一个非常昂贵(长时间)的迭代......这是我这种疯狂想法的唯一意义
  • 如果流很大,最好的处理方式是看能不能并行化。如果这必须是一个顺序操作,正如这里和here 所指出的那样,也许最好避免使用 API 玩游戏并使用一个很好的旧 .forEach()
  • @FedericoPeraltaSchaffner 我认为在这种情况下自定义收集器将是答案,无论来源是什么......
  • @FedericoPeraltaSchaffner 当然,但是在那个计数中,有很多易失性写入和读取,你必须衡量哪个才是赢家......当然,我会选择收集器

标签: java lambda java-stream


【解决方案1】:

自定义收集器是这里的答案:

Entry<List<Integer>, Integer> result = list.stream()
            .collect(Collector.of(
                    () -> new SimpleEntry<>(new ArrayList<>(), 0),
                    (l, x) -> {
                        if (l.getKey().size() < 10) {
                            l.getKey().add(x);
                        }
                        l.setValue(l.getValue() + 1);
                    },
                    (left, right) -> {
                        List<Integer> leftList = left.getKey();
                        List<Integer> rightList = right.getKey();
                        while (leftList.size() < 10 && rightList.size() > 0) {
                            leftList.add(rightList.remove(0));
                        }
                        left.setValue(left.getValue() + right.getValue());
                        return left;
                    }));

假设你有这个代码:

Set.of(1, 2, 3, 4)
            .stream()
            .parallel()
            .collect(Collector.of(
                    ArrayList::new,
                    (list, ele) -> {
                        System.out.println("Called accumulator");
                        list.add(ele);
                    },
                    (left, right) -> {
                        System.out.println("Combiner called");
                        left.addAll(right);
                        return left;
                    },
                    new Characteristics[] { Characteristics.CONCURRENT }));

在我们开始考虑该代码之前(对于示例而言,它的正确程度并不重要),我们需要阅读一下CONCURRENT 特征的文档:

如果一个 CONCURRENT 收集器也不是 UNORDERED,那么只有在应用于无序数据源时才应该同时评估它。

本文档的基本内容是,如果您的收集器是 CONCURRENT 并且流的源是 UNORDERED(如 Set)或者我们显式调用 unordered 然后永远不会调用合并。

如果您运行前面的代码,您会看到Combiner called 永远不会出现在输出中。

如果您将Set.of(1, 2, 3, 4) 更改为List.of(1, 2, 3, 4),您将看到不同的图片(忽略您得到的结果的正确性——因为ArrayList 不是线程安全的,但这不是重点)。如果您将流的源设为List同时调用unordered,您将再次看到只调用了累加器,即:

 List.of(1, 2, 3, 4)
            .stream()
            .unordered()
            .parallel()
            .collect(Collector.of(
                    ArrayList::new,
                    (list, ele) -> {
                        System.out.println("Called accumulator");
                        list.add(ele);
                    },
                    (left, right) -> {
                        System.out.println("Combiner called");
                        left.addAll(right);
                        return left;
                    },
                    new Characteristics[] { Characteristics.CONCURRENT }));

【讨论】:

  • leftList.add(rightList.remove(0)); 的意义何在?这不只是在右列表中添加第一个元素吗?这可以安全地并行化吗?如果不是,那么复杂的累加器部分有什么意义?如果我使用 Collections.synchronizedList 并添加 Collector.Characteristics CONCURRENT 作为收集器的特征,那么在使用并行化时我会以某种方式受益(因为我认为订购它不能)?
  • @Whimusical 您在此评论中提出了很多问题,我将尝试解决这些问题。此收集器按原样不添加 CONCURRENT 特性,也不应该添加 - 因为它完全改变了收集器的工作方式,请参阅 here这不只是在右侧列表中添加第一个元素 - 它正在添加到 left 列表中,这是一个非常重要的区别。
  • @Whimusical 这可以安全地并行化 - 是的。 如果我使用 Collections.synchronizedList - 你将在哪里使用它?作为流的来源?如果是这样,这并不重要,重要的是你在收集器逻辑中做什么。对于你的最后一句话,请阅读我上面的链接
  • @Whimusical 我决定稍微扩展一下答案并删除我以前的 cmets 可能会更清楚
  • @Whimusical 对这个调用要小心,它只设置一个内部标志 - 它不会在内部打乱元素 - 至少目前是这样
【解决方案2】:

以下内容在保存摘要的本地类的帮助下使用可变归约。
收集元素的限制是通过在 combiner 函数中选择前 10 个元素来完成的。

使用IntStream的示例:

Stat result = IntStream.range(0, 100)
        .boxed()
        .filter(i -> i % 2 == 0)
        .collect(() -> new Stat(0, new ArrayList<Integer>()), 
            (stat, integer) -> {
                stat.count++;
                if (stat.list.size() < 10) {
                    stat.list.add(integer);
                }
            }, 
            (stat1, stat2) -> {
                stat1.list.addAll(stat2.list.subList(0, Math.min(stat2.list.size(), 
                    10 - stat1.list.size())));
            });

这是流中使用的Stat 类(您可以轻松使用Pair&lt;Long, List&lt;Integer&gt;&gt; 之类的东西):

private static class Stat {
    long count;
    List<Integer> list;

    public Stat(long count, List<Integer> list) {
        this.count = count;
        this.list = list;
    }
}

以上示例结果为[count=50,list=[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]]

【讨论】:

  • @Holger 谢谢。我比较关心收集元素的顺序;但在测试了你的版本之后,我认为这种担忧可能是没有根据的(我有一些阅读要做......)
  • @Holger 那么这个解决方案可以与您的观点并行吗?
  • @Whimusical 这里的每个解决方案都可以并行运行,但这并不能使它们正确,尤其是那些依赖副作用的解决方案
  • @Whimusical 因为相关点已被编辑到答案中,它在与并行流一起使用时可以正常工作。删除boxed() 会提高性能,但解​​决方案是正确的,不管有没有。
  • @Holger 我的意思是订单安全。如果我使用 Collections.synchronizedList 并添加 Collector.Characteristics CONCURRENT 作为收集器的特征,那么在使用并行化时我会以某种方式受益(因为我认为订购它不能)?
【解决方案3】:

这是一个简单的 lambda 表达式,它将计数添加到您的列表中的任何项目,最多 10 个,使其通过您的过滤器:

i -> {if (myList.size() < 10) myList.add(i);}

但你不能简单地使用count() on Stream

如果一个实现能够直接从流源计算计数,则可以选择不执行流管道(顺序或并行)。在这种情况下,不会遍历源元素,也不会评估任何中间操作。有副作用的行为参数,除了调试等无害的情况外,强烈建议不要使用,可能会受到影响。

对于我的情况,使用count()peek() 没有被调用,因为元素没有被遍历,我的列表是空的。

选择一个简单的归约来计算元素。

.reduce(0, (a, b) -> a + 1);

我的代码:

int count = yourCollection.stream()
    .filter(myFilter)
    .peek(i -> {if (myList.size() < 10) myList.add(i);} )
    .reduce(0, (a, b) -> a + 1);

【讨论】:

  • reduce 操作需要我的流式实体的类型用于身份和返回情况。如果我没记错的话,不幸的是,这对我的情况没有用,因为我不是流式传输整数。我可以在之前添加一个 .map(t -> 1),但我不确定这种方法可以以任何方式通过并行优化
  • @Whimusical 您可以将终端操作更改为.reduce(0, (a, b) -&gt; a + 1, Integer::sum),以使其工作,这比前面的map 步骤效率略高(除非您使用mapToInt(x -&gt; 1).sum())。然而,虽然减少是正确的并且可以并行工作,但peek 不是。即使myList 指的是线程安全集合,if(myList.size() &lt; 10) myList.add(i); 也是“Check-then-Act”反模式的受害者。检查条件后,不保证进入后续条件操作时仍然成立。
【解决方案4】:

这是另一种解决方案,不确定是否符合您的要求。

    final Count c = new Count();

    coll.stream().forEach(e -> {
        c.setTotCount(c.getTotCount() + 1);

        if (/*your filter*/) {
           // add till 10 elements only
           if (c.getMyList().size() <= 10) {
              c.addMyList(e);
           }
        }
    });

并且定义了辅助类

class Count {
    int totCount;
    // Student for an example
    List<Student> myList = new ArrayList<>();

    public List<Student> getMyList() {
        return myList;
    }

    public void addMyList(Student std) {
        this.myList.add(std);
    }

    // getter and setter for totCount
}

现在您有了列表和总数,它们都存储在帮助对象c 中。使用以下方法获取列表的总数:

  System.out.println(c.getTotCount());
  System.out.println(c.getMyList().size());

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-08-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-04-22
    • 1970-01-01
    • 2013-07-20
    相关资源
    最近更新 更多