【问题标题】:Check if java stream has been consumed检查java流是否已被消费
【发布时间】:2019-07-08 01:58:48
【问题描述】:

如何检查流实例是否已被使用(意味着调用了终端操作,这样对终端操作的任何进一步调用都可能因IllegalStateException: stream has already been operated upon or closed. 而失败?

理想情况下,我想要一个在流尚未被消费时不消费流的方法,如果流已被消费而没有从流方法中捕获IllegalStateException,则返回布尔值 false(因为使用异常进行控制flow 成本高且容易出错,尤其是在使用标准异常时)。

在异常抛出和布尔返回行为中类似于迭代器中的hasNext() 的方法(尽管没有与next() 的约定)。

例子:

public void consume(java.util.function.Consumer<Stream<?>> consumer, Stream<?> stream) {
   consumer.accept(stream);
   // defensive programming, check state
   if (...) {
       throw new IllegalStateException("consumer must call terminal operation on stream");
   }
}

如果客户端代码调用此方法而不使用流,目标是尽早失败。

似乎没有办法做到这一点,我必须添加一个 try-catch 块来调用任何终端操作,如 iterator(),捕获一个异常并抛出一个新异常。

可接受的答案也可以是“不存在解决方案”,并有充分的理由说明为什么规范不能添加这种方法(如果存在充分的理由)。似乎 JDK 流通常在其终端方法的开头有这个 sn-ps:

// in AbstractPipeline.java
if (linkedOrConsumed)
    throw new IllegalStateException(MSG_STREAM_LINKED);

所以对于那些流来说,实现这样的方法似乎并不难。

【问题讨论】:

  • 好像没办法,按照here的讨论,连IllegalStateException都不能保证抛出。我想知道iterator.hasNext() 会做什么。
  • 不要传递流,传递流提供者。这类似于传递迭代器而不是可迭代并尝试迭代两次。
  • 您能否详细说明什么是消费者、“流已被消费”的确切含义以及您希望总体实现的目标。从你已经写的内容来看,这似乎不是很清楚。
  • @tkruse 你能举一个例子来解释你所说的“但是他们提供或消费的流实际上并没有被使用”吗?对于可能推断的内容,您似乎正在使用Consumer 而不是直接使用Stream,并且一旦您调用accept 方法,您要确保它是否实际使用了提供的流。是这样吗?为什么(何时)这样的Consumer 中的代码不会真正消耗流?
  • 为什么不知道流有没有被消费?流被消费后有什么用?为什么要保留对已使用的流的引用?我还想不出这个问题的用例。

标签: java java-stream


【解决方案1】:

考虑到spliterator(例如)是一个终端操作,你可以简单地创建一个类似的方法:

private static <T> Optional<Stream<T>> isConsumed(Stream<T> stream) {

    Spliterator<T> spliterator;
    try {
        spliterator = stream.spliterator();
    } catch (IllegalStateException ise) {
        return Optional.empty();
    }

    return Optional.of(StreamSupport.stream(
        () -> spliterator,
        spliterator.characteristics(),
        stream.isParallel()));
}

我不知道有什么更好的方法……而且用法是:

Stream<Integer> ints = Stream.of(1, 2, 3, 4)
                                 .filter(x -> x < 3);

YourClass.isConsumed(ints)
         .ifPresent(x -> x.forEachOrdered(System.out::println));

由于我认为没有实际理由返回已使用的 Stream,因此我将返回 Optional.empty()

【讨论】:

  • 当然,我知道这个解决方案并在问题中提到它。这不是我正在寻找的方法,因为它使用异常来控制流(并且由于创建 Stacktrace,创建异常既昂贵又缓慢)。但如果几天后没有找到其他答案,我可能会接受这个答案。
  • @tkruse 您说的是在使用 Stream 的情况下会产生几纳秒的开销,这比简单的 for 循环慢 10 倍 - 在这里关心 some ns 是最重要的最少
  • 很公平。另一个问题是,只有已经消费的流中的非法状态异常应该被捕获,其他的可能需要重新抛出。
  • @tkruse 据我所知,没有其他选择......我希望有人能想出另一种有趣的方法来做到这一点,但似乎不可能。
【解决方案2】:

一种解决方案是在将中间操作(例如filter())传递给stream 之前,将其传递给consumer。在该操作中,除了保存调用该操作的状态(例如,使用AtomicBoolean)之外,您什么都不做:

public <T> void consume(Consumer<Stream<T>> consumer, Stream<T> stream) {
    AtomicBoolean consumed = new AtomicBoolean(false);
    consumer.accept(stream.filter(i -> {
        consumed.set(true);
        return true;
    }));
    if (!consumed.get()) {
        throw new IllegalStateException("consumer must call terminal operation on stream");
    }
}

旁注:不要为此使用peek(),因为它不会被短路终端操作调用(如findAny())。

【讨论】:

  • 如果您至少将此方法设为可以编译的方法,那就太好了……但无论如何,这是不正确的。即使您编译了filter 仅当流中有至少 一个元素时才设置consumed 标志,这也不能解决OP 关于流为consumed 的问题
  • 不过,当流中至少有一个元素时,这是一个很好的答案。
【解决方案3】:

这是一个独立的可编译解决方案,它使用委托自定义 Spliterator&lt;T&gt; 实现 + AtomicBoolean 来完成您所寻求的,而不会丢失线程安全或影响 Stream&lt;T&gt; 的并行性。

主要入口是Stream&lt;T&gt; track(Stream&lt;T&gt; input, Consumer&lt;Stream&lt;T&gt;&gt; callback) 函数——你可以在回调函数中做任何你想做的事情。我首先修改了一个委托Stream&lt;T&gt; 实现,但它的接口太大而无法委托而没有任何问题(请参阅我的代码注释,即使Spliterator&lt;T&gt; 在委托时也有其警告):

import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

class StackOverflowQuestion56927548Scratch {

    private static class TrackingSpliterator<T> implements Spliterator<T> {
        private final AtomicBoolean tracker;
        private final Spliterator<T> delegate;
        private final Runnable callback;

        public TrackingSpliterator(Stream<T> forStream, Runnable callback) {
            this(new AtomicBoolean(true), forStream.spliterator(), callback);
        }

        private TrackingSpliterator(
                AtomicBoolean tracker,
                Spliterator<T> delegate,
                Runnable callback
        ) {
            this.tracker = tracker;
            this.delegate = delegate;
            this.callback = callback;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean advanced = delegate.tryAdvance(action);
            if(tracker.compareAndSet(true, false)) {
                callback.run();
            }
            return advanced;
        }

        @Override
        public Spliterator<T> trySplit() {
            Spliterator<T> split = this.delegate.trySplit();
            //may return null according to JavaDoc
            if(split == null) {
                return null;
            }
            return new TrackingSpliterator<>(tracker, split, callback);
        }

        @Override
        public long estimateSize() {
            return delegate.estimateSize();
        }

        @Override
        public int characteristics() {
            return delegate.characteristics();
        }
    }

    public static <T> Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback) {
        return StreamSupport.stream(
                new TrackingSpliterator<>(input, () -> callback.accept(input)),
                input.isParallel()
        );
    }

    public static void main(String[] args) {
        //some big stream to show it works correctly when parallelized
        Stream<Integer> stream = IntStream.range(0, 100000000)
                .mapToObj(Integer::valueOf)
                .parallel();
        Stream<Integer> trackedStream = track(stream, s -> System.out.println("consume"));

        //dummy consume
        System.out.println(trackedStream.anyMatch(i -> i.equals(-1)));
    }
}

只需返回track 函数的流,也许调整callback 参数类型(您可能不需要传递流),您就可以开始了。

请注意,此实现仅跟踪实际使用流的时间,在由例如生成的 Stream 上调用 .count()IntStream.range(0,1000)(没有任何过滤步骤等)不会消耗流,而是通过Spliterator&lt;T&gt;.estimateSize()返回流的底层已知长度!

【讨论】:

  • 我不认为这是正确的。 boolean advanced = delegate.tryAdvance(action); 这不会检查 Stream 是否被使用,而是检查它是否为空。例如:Stream&lt;Integer&gt; empty = Stream.empty(); Stream&lt;Integer&gt; wrapped = track(empty, x -&gt; System.out.println("x = " + x)); wrapped.forEach(System.out::println); 如果发生这种情况(尽管这不是我上面所说的正确的事情):if (tracker.compareAndSet(true, false)) {,你应该真的停止每一个后续操作,对吧?
  • 我的意思是这应该显示什么? Stream&lt;Integer&gt; s = Stream.concat(Stream.empty(), Stream.of(1, 2, 3, 4)) .filter(x -&gt; x &gt; 0); Stream&lt;Integer&gt; wrapped = track(s, x -&gt; System.out.println("consume")); wrapped.forEachOrdered(System.out::println);
  • 我不检查boolean advanced = delegate.tryAdvance(action);,我只是打电话给代表,所以我不太了解。问题是流是否被消耗,一旦调用tryAdvance 就是这种情况,所以不管它是否为空,只要至少调用一次tryAdvance,它就会被消耗。您的第一条评论示例正确打印一次。您的第二个示例也只调用了一次回调(在使用第一个元素之后)。我不明白你对tracker.compareAndSet的评论,我只是用它来保证回调只被调用一次
  • 所见,消费流是已被终端操​​作消费的流,在我看来,OP 在他/她的问题的第一个陈述
  • 你把它留在这里是对的,我真的很喜欢理解它背后的想法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-24
  • 2016-08-29
  • 1970-01-01
  • 2015-04-01
  • 2019-06-23
相关资源
最近更新 更多