【问题标题】:How to transform a Java stream into a sliding window?如何将 Java 流转换为滑动窗口?
【发布时间】:2016-03-13 12:50:41
【问题描述】:

将流转换为滑动窗口的推荐方法是什么?

例如,在 Ruby 中,您可以使用 each_cons

irb(main):020:0> [1,2,3,4].each_cons(2) { |x| puts x.inspect }
[1, 2]
[2, 3]
[3, 4]
=> nil
irb(main):021:0> [1,2,3,4].each_cons(3) { |x| puts x.inspect }
[1, 2, 3]
[2, 3, 4]
=> nil

在Guava中,我只找到了Iterators#partition,有相关但没有滑动窗口:

final Iterator<List<Integer>> partition =
   Iterators.partition(IntStream.range(1, 5).iterator(), 3);
partition.forEachRemaining(System.out::println);
-->
[1, 2, 3]
[4]

【问题讨论】:

标签: java java-8 java-stream


【解决方案1】:

API 中没有这样的功能,因为它同时支持顺序和并行处理,并且很难为任意流源的滑动窗口函数提供有效的并行处理(即使是有效的对并行处理也很难,我实现了它,所以我知道)。

但是,如果您的来源是具有快速随机访问的 List,您可以使用 subList() 方法来获得所需的行为,如下所示:

public static <T> Stream<List<T>> sliding(List<T> list, int size) {
    if(size > list.size()) 
        return Stream.empty();
    return IntStream.range(0, list.size()-size+1)
                    .mapToObj(start -> list.subList(start, start+size));
}

我的StreamEx 库中实际上提供了类似的方法:请参阅StreamEx.ofSubLists()

还有一些其他第三方解决方案不关心并行处理并使用一些内部缓冲区提供滑动功能。例如,质子包StreamUtils.windowed

【讨论】:

  • 不应该是 range(0, list.size() / size + 1) 然后用 (start * size, start * size + size) 制作子列表来处理最后一页不超出 list.size()
  • @Triqui,看到问题,它是用于重叠窗口的。您想要的是在某处单独回答的非重叠批次...
【解决方案2】:

如果您愿意使用第三方库并且不需要并行性,那么jOOλ 提供如下 SQL 风格的窗口函数

int n = 2;

System.out.println(
Seq.of(1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

屈服

[[1, 2], [2, 3], [3, 4]]

int n = 3;

System.out.println(
Seq.of(1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

屈服

[[1, 2, 3], [2, 3, 4]]

Here's a blog post about how this works.

免责声明:我为 jOOλ 背后的公司工作

【讨论】:

  • 是否支持并行处理?
  • @Normal: 不。通过“并且不需要并行性”,我的意思是 jOOλ 明确用于仅顺序处理(这基本上已经足够好了)跨度>
  • 由于某种原因,window 子句不再出现在 wiki 文档 @LukasEder
  • @AshwinJayaprakash:什么 wiki 文档?
  • @LukasEder 我的意思是github.com/jOOQ/jOOL/blob/master/README.md 文件没有提到window 函数。我必须阅读测试代码才能了解更多信息:github.com/jOOQ/jOOL/blob/…
【解决方案3】:

另一个选项 cyclops-react 构建在 jOOλ 的 Seq 接口(和 JDK 8 Stream)之上,但 simple-react 构建了并发/并行性(如果这对您很重要 - 通过创建期货流)。

您可以将 Lukas 强大的窗口函数与任一库一起使用(因为我们扩展了很棒的 jOOλ),但还有一个滑动运算符,我认为在这种情况下可以简化事情并且适合在无限流中使用(即它不' 不消耗流,而是在流过时缓冲值)。

ReactiveSeq 看起来像这样 -

ReactiveSeq.of(1, 2, 3, 4)
           .sliding(2)
           .forEach(System.out::println);

LazyFutureStream 可能类似于下面的示例 -

 LazyFutureStream.iterate(1,i->i+1)
                 .sliding(3,2) //lists of 3, increment 2
                 .forEach(System.out::println);

在 cyclops-streams StreamUtils 类中还提供了用于在 java.util.stream.Stream 上创建滑动视图的等效静态方法。

       StreamUtils.sliding(Stream.of(1,2,3,4),2)
                  .map(Pair::new);

如果你想直接使用每个滑动视图,你可以使用slidingT 操作符,它返回一个List Transformer。例如,为每个滑动视图中的每个元素添加一个数字,然后将每个滑动窗口减少到我们可以做的元素之和:-

        ReactiveSeq<Integer> windowsSummed = ReactiveSeq.fromIterable(data)
                                                        .slidingT(3)
                                                        .map(a->a+toAdd)
                                                        .reduce(0,(a,b)->a+b)
                                                        .stream();

免责声明:我为 cyclops-react 背后的公司工作

【讨论】:

    【解决方案4】:

    如果您想将 Scala 持久性集合的全部功能引入 Java,您可以使用库 Vavr,以前称为 Javaslang。

    // this imports List, Stream, Iterator, ...
    import io.vavr.collection.*;
    
    Iterator.range(1, 5).sliding(3)
            .forEach(System.out::println);
    // --->
    // List(1, 2, 3)
    // List(2, 3, 4)
    
    Iterator.range(1, 5).sliding(2, 3)
            .forEach(System.out::println);
    // --->
    // List(1, 2)
    // List(4)
    
    Iterator.ofAll(javaStream).sliding(3);
    

    您不仅可以使用 Iterator,它还适用于几乎任何其他 Vavr 集合:Array、Vector、List、Stream、Queue、HashSet、LinkedHashSet、TreeSet、...

    (Javaslang 2.1.0-alpha 概览)

    免责声明:我是 Vavr 的创建者,以前称为 Javaslang。

    【讨论】:

    • 请更新名称和链接?
    【解决方案5】:

    我在 Tomek 的 Nurkiewicz 博客 (https://www.nurkiewicz.com/2014/07/grouping-sampling-and-batching-custom.html) 上找到了解决方案。下面SlidingCollector可以使用:

    public class SlidingCollector<T> implements Collector<T, List<List<T>>, List<List<T>>> {
    
        private final int size;
        private final int step;
        private final int window;
        private final Queue<T> buffer = new ArrayDeque<>();
        private int totalIn = 0;
    
        public SlidingCollector(int size, int step) {
            this.size = size;
            this.step = step;
            this.window = max(size, step);
        }
    
        @Override
        public Supplier<List<List<T>>> supplier() {
            return ArrayList::new;
        }
    
        @Override
        public BiConsumer<List<List<T>>, T> accumulator() {
            return (lists, t) -> {
                buffer.offer(t);
                ++totalIn;
                if (buffer.size() == window) {
                    dumpCurrent(lists);
                    shiftBy(step);
                }
            };
        }
    
        @Override
        public Function<List<List<T>>, List<List<T>>> finisher() {
            return lists -> {
                if (!buffer.isEmpty()) {
                    final int totalOut = estimateTotalOut();
                    if (totalOut > lists.size()) {
                        dumpCurrent(lists);
                    }
                }
                return lists;
            };
        }
    
        private int estimateTotalOut() {
            return max(0, (totalIn + step - size - 1) / step) + 1;
        }
    
        private void dumpCurrent(List<List<T>> lists) {
            final List<T> batch = buffer.stream().limit(size).collect(toList());
            lists.add(batch);
        }
    
        private void shiftBy(int by) {
            for (int i = 0; i < by; i++) {
                buffer.remove();
            }
        }
    
        @Override
        public BinaryOperator<List<List<T>>> combiner() {
            return (l1, l2) -> {
                throw new UnsupportedOperationException("Combining not possible");
            };
        }
    
        @Override
        public Set<Characteristics> characteristics() {
            return EnumSet.noneOf(Characteristics.class);
        }
    
    }
    

    下面是 Tomekin Spock 的一些例子(我希望它是可读的):

    import static com.nurkiewicz.CustomCollectors.sliding
    
    @Unroll
    class CustomCollectorsSpec extends Specification {
    
        def "Sliding window of #input with size #size and step of 1 is #output"() {
            expect:
            input.stream().collect(sliding(size)) == output
    
            where:
            input  | size | output
            []     | 5    | []
            [1]    | 1    | [[1]]
            [1, 2] | 1    | [[1], [2]]
            [1, 2] | 2    | [[1, 2]]
            [1, 2] | 3    | [[1, 2]]
            1..3   | 3    | [[1, 2, 3]]
            1..4   | 2    | [[1, 2], [2, 3], [3, 4]]
            1..4   | 3    | [[1, 2, 3], [2, 3, 4]]
            1..7   | 3    | [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]]
            1..7   | 6    | [1..6, 2..7]
        }
    
        def "Sliding window of #input with size #size and no overlapping is #output"() {
            expect:
            input.stream().collect(sliding(size, size)) == output
    
            where:
            input | size | output
            []    | 5    | []
            1..3  | 2    | [[1, 2], [3]]
            1..4  | 4    | [1..4]
            1..4  | 5    | [1..4]
            1..7  | 3    | [1..3, 4..6, [7]]
            1..6  | 2    | [[1, 2], [3, 4], [5, 6]]
        }
    
        def "Sliding window of #input with size #size and some overlapping is #output"() {
            expect:
            input.stream().collect(sliding(size, 2)) == output
    
            where:
            input | size | output
            []    | 5    | []
            1..4  | 5    | [[1, 2, 3, 4]]
            1..7  | 3    | [1..3, 3..5, 5..7]
            1..6  | 4    | [1..4, 3..6]
            1..9  | 4    | [1..4, 3..6, 5..8, 7..9]
            1..10 | 4    | [1..4, 3..6, 5..8, 7..10]
            1..11 | 4    | [1..4, 3..6, 5..8, 7..10, 9..11]
        }
    
        def "Sliding window of #input with size #size and gap of #gap is #output"() {
            expect:
            input.stream().collect(sliding(size, size + gap)) == output
    
            where:
            input | size | gap | output
            []    | 5    | 1   | []
            1..9  | 4    | 2   | [1..4, 7..9]
            1..10 | 4    | 2   | [1..4, 7..10]
            1..11 | 4    | 2   | [1..4, 7..10]
            1..12 | 4    | 2   | [1..4, 7..10]
            1..13 | 4    | 2   | [1..4, 7..10, [13]]
            1..13 | 5    | 1   | [1..5, 7..11, [13]]
            1..12 | 5    | 3   | [1..5, 9..12]
            1..13 | 5    | 3   | [1..5, 9..13]
        }
    
        def "Sampling #input taking every #nth th element is #output"() {
            expect:
            input.stream().collect(sliding(1, nth)) == output
    
            where:
            input  | nth | output
            []     | 1   | []
            []     | 5   | []
            1..3   | 5   | [[1]]
            1..6   | 2   | [[1], [3], [5]]
            1..10  | 5   | [[1], [6]]
            1..100 | 30  | [[1], [31], [61], [91]]
        }
    }
    

    【讨论】:

      【解决方案6】:

      另一种选择是像 here 一样实现自定义 Spliterator:

      import java.util.*;
      
      public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {
      
          static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
              return StreamSupport.stream(
                new SlidingWindowSpliterator<>(stream, windowSize), false);
          }
      
          private final Queue<T> buffer;
          private final Iterator<T> sourceIterator;
          private final int windowSize;
          private final int size;
      
          private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
              this.buffer = new ArrayDeque<>(windowSize);
              this.sourceIterator = Objects.requireNonNull(source).iterator();
              this.windowSize = windowSize;
              this.size = calculateSize(source, windowSize);
          }
      
          @Override
          public boolean tryAdvance(Consumer<? super Stream<T>> action) {
              if (windowSize < 1) {
                  return false;
              }
      
              while (sourceIterator.hasNext()) {
                  buffer.add(sourceIterator.next());
      
                  if (buffer.size() == windowSize) {
                      action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
                      buffer.poll();
                      return sourceIterator.hasNext();
                  }
              }
      
              return false;
          }
      
          @Override
          public Spliterator<Stream<T>> trySplit() {
              return null;
          }
      
          @Override
          public long estimateSize() {
             return size;
          }
      
          @Override
          public int characteristics() {
              return ORDERED | NONNULL | SIZED;
          }
      
          private static int calculateSize(Collection<?> source, int windowSize) {
              return source.size() < windowSize
                ? 0
                : source.size() - windowSize + 1;
          }
      }
      

      【讨论】:

        【解决方案7】:

        不确定这是“安全”还是“好”,但也许你们让我知道。

        public static <T> Stream<List<T>> sliding(Stream<T> stream, int window) {
            Queue<T> queue = new LinkedList<>();
            return stream.dropWhile(item -> {
                if (queue.size() < window - 1) {
                    queue.add(item);
                    return true;
                }
                return false;
            }).map(item -> {
                queue.add(item);
                List<T> ret = queue.stream().toList();
                queue.remove();
                return ret;
            });
        }
        
        public static void main(String[] args) {
            sliding(Stream.of(1, 2, 3, 4, 5, 6, 7), 3).forEach(x -> System.out.println(x));
        }
        
        [1, 2, 3]
        [2, 3, 4]
        [3, 4, 5]
        [4, 5, 6]
        [5, 6, 7]
        

        【讨论】:

          猜你喜欢
          • 2021-06-22
          • 2020-04-02
          • 2017-06-17
          • 1970-01-01
          • 2011-12-16
          • 2015-06-16
          • 2015-07-13
          • 2022-08-18
          相关资源
          最近更新 更多