【问题标题】:Java stream reduceJava 流减少
【发布时间】:2019-02-09 16:42:11
【问题描述】:

我有以下示例数据集,我想根据方向值使用 Java 流 api 进行转换/减少

Direction    int[]
IN           1, 2
OUT          3, 4
OUT          5, 6, 7
IN           8
IN           9
IN           10, 11
OUT          12, 13
IN           14

Direction    int[]
IN           1, 2, 
OUT          3, 4, 5, 6, 7
IN           8, 9, 10, 11
OUT          12, 13
IN           14

到目前为止我写的代码

enum Direction { IN, OUT }

class Tuple {
  Direction direction;
  int[] data;

  public Tuple merge(Tuple t) {
      return new Tuple(direction, concat(getData(), t.getData()));
  }
}

private static int[] concat(int[] first, int[] second) {
    int[] result = Arrays.copyOf(first, first.length + second.length);
    System.arraycopy(second, 0, result, first.length, second.length);
    return result;
}

List<Tuple> reduce = tupleStream.reduce(new ArrayList<>(), WDParser::add, WDParser::combine);

private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2) {
    System.out.println("combine");
    list1.addAll(list2);
    return list1;
}

private static List<Tuple> add(List<Tuple> list, Tuple t) {
    System.out.println("add");
    if (list.size() == 0) {
        list.add(t);
    } else if (list.size() > 0) {
        int lastIndex = list.size() - 1;
        Tuple last = list.get(lastIndex);
        if (last.getDirection() == t.getDirection())
            list.set(lastIndex, last.merge(t));
        else
            list.add(t);
    }

    return list;
}

我相信有一个更好、更简单的替代方法可以达到同样的效果。

我为 Java 流 api 减少/组合找到的在线示例和博客仅使用 Integer::sum 函数。希望为更复杂的案例场景建立这个。

【问题讨论】:

    标签: java java-8 java-stream reduce


    【解决方案1】:

    我认为您的解决方案已经相当不错了,尤其是与收集到共享的外部容器相比,使用归约可以轻松实现并行性。但是正如 Holger 指出的那样,使用 collect 而不是 reduce 更容易。此外,累加器中的条件可以简化一点,您忘记合并组合器中的最后一个元素和第一个元素:

    List<Tuple> reduce = tupleStream.collect(ArrayList::new, WDParser::add, WDParser::combine);
    
    private static List<Tuple> combine(List<Tuple> list1, List<Tuple> list2)
    {
        if (!list2.isEmpty())
        {
            add(list1, list2.remove(0)); // merge lists in the middle if necessary
            list1.addAll(list2);         // add all the rest
        }
        return list1;
    }
    
    private static List<Tuple> add(List<Tuple> list, Tuple t)
    {
        int lastIndex = list.size() - 1;
        if (list.isEmpty() || list.get(lastIndex).getDirection() != t.getDirection())
        {
            list.add(t);
        }
        else
        {
            list.set(lastIndex, list.get(lastIndex).merge(t));
        }
        return list;
    }
    

    您甚至可以使用LinkedList 和方法add/removeFirst/Last(),而不是使用索引来访问第一个/最后一个元素。

    【讨论】:

    • 这种使用reduce 的方式在尝试并行执行时会非常糟糕。传递给reduce 的函数不应该修改传入的对象。这种操作应该使用collect来实现。见Mutable Reduction
    • @Holger 我之前考虑过,但现在你再次指出它才发现问题。已将其更改为收藏。这足够了吗,还是我还必须在累加器和合并器中创建新列表?
    • 您不需要在累加器或合并器中创建新列表;这就是收藏家的全部意义所在。唯一缺少的是,组合器也应该能够处理空的list2 参数(这可能发生在collectfilter 之后)。
    • 谢谢,今天晚些时候我会检查解决方案并更新。
    • @Amitoj 它只会在并行流中被调用。简单地说,在这种情况下,流将被分成块。每个部分都将使用“累加器”(add)收集。之后,这些收集结果将被“组合”成最终结果。对于顺序流,只有 1 部分(整个流)累积,因此不需要使用组合器。
    【解决方案2】:

    这个怎么样。先定义一个小辅助方法:

    private static Tuple mergeTwo(Tuple left, Tuple right) {
        int[] leftArray = left.getData();
        int[] rightArray = right.getData();
        int[] result = new int[leftArray.length + rightArray.length];
        System.arraycopy(leftArray, 0, result, 0, leftArray.length);
        System.arraycopy(rightArray, 0, result, leftArray.length, rightArray.length);
        return new Tuple(left.getDirection(), result);
    }
    

    我猜这与您的concat/merge 很接近,但只有一个。基本上是一种将两个Tuple(s) 合并在一起的方法。

    还有一个辅助方法来产生所需的Collector,你可以把它放到一个实用程序中以便它可以被重复使用:

    private static Collector<Tuple, ?, List<Tuple>> mergedTuplesCollector() {
        class Acc {
    
            ArrayDeque<Tuple> deque = new ArrayDeque<>();
    
            void add(Tuple elem) {
                Tuple head = deque.peek();
                if (head == null || head.getDirection() != elem.getDirection()) {
                    deque.offerFirst(elem);
                } else {
                    deque.offerFirst(mergeTwo(deque.poll(), elem));
                }
            }
    
            Acc merge(Acc right) {
    
                Tuple lastLeft = deque.peekLast();
                Tuple firstRight = right.deque.peekFirst();
    
                if (lastLeft.getDirection() == firstRight.getDirection()) {
                    deque.offerLast(mergeTwo(deque.pollLast(), right.deque.pollFirst()));
                } else {
                    deque.addAll(right.deque);
                }
    
                return this;
            }
    
            public List<Tuple> finisher() {
                return new ArrayList<>(deque);
            }
    
        }
        return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
    }
    

    例如,用法是:

    List<Tuple> merged = tuples.stream()
                .parallel()
                .collect(mergedTuplesCollector());
    

    【讨论】:

      【解决方案3】:

      这是一种使用略有不同的数据结构的替代方法。

      如果这是一个选项,从int[] 更改为List&lt;Integer&gt; 可以提供更大的灵活性(更不用说避免多次创建/复制数组):

      class Tuple {
          Direction direction;
          List<Integer> data;
      }
      

      以下函数会合并Deque 集合:

      private static List<Integer> next(Deque<Tuple> t, Direction d) {
          if (!t.isEmpty() && t.peekLast().getDirection() == d) {
              return t.peekLast().getData();
          } else {
              Tuple next = new Tuple();
              next.direction = d;
              next.data = new ArrayList<>();
              t.addLast(next);
              return next.data;
          }
      }
      

      这样,流看起来就很简单:

      Deque<Tuple> deq = new LinkedList<>(); //the final collection of tuples
      
      tuples.stream()
      .flatMap(tp -> tp.getData().stream()
                       .map(d -> Pair.of(tp.getDirection(), Integer.valueOf(d))))
      .forEach(el -> next(deq, el.getLeft()).add(el.getRight()));
      

      【讨论】:

      • 我认为在这里使用forEachRemaining 而不是forEach 更安全。文档说forEach 对元素的处理顺序不确定,据我所知,这可能会导致与您的方法不一致。
      • @MalteHartwig 我想你的意思是forEachOrdered :)
      • @FedericoPeraltaSchaffner 哈哈哈,是的,把它和 Spliterator 搞混了
      • 谢谢,我使用 int[] 因为它更容易从 Netty ByteBuf 对象转换为 &。
      • stream flatMap 的有趣使用,我会保留这个以备将来参考。
      【解决方案4】:

      我对这个话题有两个想法。第一个是获取this answer 中的索引并相应地对其进行分组。

      第二个想法 - 如果您已经有了 Stream,则应该使用自定义 Collector(类似于其他解决方案,但使用 Deque):

      private Collector<Tuple, ?, List<Tuple>> squashTuples() {
        return new Collector<Tuple, Deque<Tuple>, List<Tuple>>() {
          @Override
          public Supplier<Deque<Tuple>> supplier() {
            return ArrayDeque::new;
          }
      
          @Override
          public BiConsumer<Deque<Tuple>, Tuple> accumulator() {
            return (acc, e) -> {
              Objects.requireNonNull(e);
              if (!acc.isEmpty() && acc.peekLast().getDirection() == e.getDirection()) {
                acc.offerLast(acc.pollLast().merge(e));
              } else {
                acc.offerLast(e);
              }
            };
          }
      
          @Override
          public BinaryOperator<Deque<Tuple>> combiner() {
            return (left, right) -> {
              if (!left.isEmpty() && !right.isEmpty() && left.peekLast().getDirection() == right.peekFirst().getDirection()) {
                left.offerLast(left.pollLast().merge(right.pollFirst()));
              }
              left.addAll(right);
              return left;
            };
          }
      
          @Override
          public Function<Deque<Tuple>, List<Tuple>> finisher() {
            return ArrayList::new;
          }
      
          @Override
          public Set<Characteristics> characteristics() {
            return EnumSet.noneOf(Characteristics.class);
          }
        };
      }
      

      【讨论】:

      • 我认为combiner 在这种情况下不需要检查isEmpty
      • @Eugene 我认为你基本上是对的。不应该有一个空容器的合并步骤,但是有什么保证吗?我在文档中看不到任何内容。
      • 好吧,考虑到accumulator 总是在combiner 之前被调用,似乎总是如此;但我只是想知道这是否被记录在案......我会尝试找到这个
      猜你喜欢
      • 2015-11-05
      • 1970-01-01
      • 2019-03-08
      • 1970-01-01
      • 1970-01-01
      • 2017-04-05
      • 2020-05-04
      • 2017-10-28
      • 1970-01-01
      相关资源
      最近更新 更多