【问题标题】:Java 8 lambda: iterate over stream objects and use previous/next object(s) in streamJava 8 lambda:迭代流对象并在流中使用上一个/下一个对象
【发布时间】:2017-03-15 13:20:48
【问题描述】:

我正在练习一些入门级的 java 8 lambda 功能。

给定一个消息列表,每个消息都包含一个消息偏移量,其中所有偏移量必须形成一个连续的整数列表,我试图找到要警告的差距。我觉得这一切都应该通过一个不错的 lambda 来实现。但我无法理解它。

所以,有这个工作的 sn-p:

private void warnAboutMessageGaps(final List<Message> messages) {

    final List<Long> offsets = messages.stream()
            .sorted(comparingLong(Message::getOffset))
            .map(Message::getOffset)
            .collect(toList())
            ;

    for (int i = 0; i < offsets.size() - 1; i++) {
        final long currentOffset = offsets.get(i);
        final long expectedNextOffset = offsets.get(i) + 1;
        final long actualNextOffset = offsets.get(i + 1);
        if (currentOffset != expectedNextOffset) {
            LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, actualNextOffset - 1);
        }
    }
}

我不知道如何制作它以便我可以在 lambda 中进行“与上一个/下一个对象比较”。任何指针将不胜感激。

/edit:关于 StreamEx 和其他第三方解决方案的建议虽然受到赞赏,但并不是我想要的。

【问题讨论】:

  • 这是流不太擅长的事情,因为如果您必须查看流中的上一个或下一个对象,它不会很好地并行化。
  • 似乎你需要两组之间的对称差异,番石榴可以帮助解决这个问题:Set&lt;Long&gt; all = LongStream.range(offsets.get(0), offsets.get(offsets.size() - 1)).boxed().collect(Collectors.toSet()); System.out.println(Sets.symmetricDifference(all, ImmutableSet.copyOf(offsets)));
  • 为什么是.sorted(comparingLong(Message::getOffset)) .map(Message::getOffset) 而不是.map(Message::getOffset).sorted()

标签: java lambda java-8 java-stream


【解决方案1】:

您可以通过StreamEx 使用pairMap 方法来做到这一点:

StreamEx.of(messages)
        .sorted(Comparator.comparingLong(Message::getOffset))
        .pairMap((prev, next) -> new Message[] {prev, next})
        .forEach(prevNext -> {
            long currentOffset = prevNext[0].getOffset();
            long expectedNextOffset = prevNext[0].getOffset() + 1;
            long actualNextOffset = prevNext[1].getOffset();
            if (currentOffset != expectedNextOffset) {
                LOG.error(
                    "Missing offset(s) found in messages: missing from {} to {}",
                    currentOffset + 1, actualNextOffset - 1);
            }
        });

【讨论】:

    【解决方案2】:

    有时,尝试使用 lambda 表达式做所有事情会使解决方案变得更加复杂。您可以使用:

    messages.stream()
        .mapToLong(Message::getOffset)
        .sorted()
        .forEachOrdered(new LongConsumer() {
            boolean first=true;
            long expected;
            public void accept(long value) {
                if(first) first=false;
                else if(value!=expected)
                    LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                              expected, value);
                expected=value+1;
            }
        });
    

    但请注意,无论流链看起来多么流畅,sorted() 是一个有状态的中间操作,它在幕后创建和使用支持数组。如果您明确使用该数组,您不会丢失任何东西:

    long[] l = messages.stream().mapToLong(Message::getOffset).toArray();
    Arrays.sort(l);
    for(int ix=1; ix<l.length; ix++) {
        long value = l[ix], expected = l[ix-1]+1;
        if(value!=expected)
            LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                      expected, value);
    }
    

    很难找到更简单的解决方案。但是如果你想减少所需的内存量,你可以使用BitSet 而不是数组:

    OptionalLong optMin = messages.stream().mapToLong(Message::getOffset).min();
    if(!optMin.isPresent()) return;
    long min = optMin.getAsLong();
    BitSet bset = messages.stream()
        .mapToLong(Message::getOffset)
        .collect(BitSet::new, (bs,l) -> bs.set((int)(l-min)), BitSet::or);
    for(int set=0, clear; set>=0; ) {
        clear = bset.nextClearBit(set);
        set = bset.nextSetBit(clear);
        if(set >= 0)
            LOG.error("Missing offset(s) found in messages: missing from {} to {}",
                      min+clear, min+set);
    }
    

    与偏移值范围相比,在没有间隙或相当小的间隙发生的情况下,这将显着减少使用的内存。当最小偏移量和最大偏移量之间的距离大于Integer.MAX_VALUE时失败。

    您可以事先检查一下,如果根本没有间隙,这也为走捷径提供了机会:

    LongSummaryStatistics stat = messages.stream()
        .mapToLong(Message::getOffset).summaryStatistics();
    if(stat.getCount()==0 ||
       // all solutions assume that there are no duplicates, in this case,
       // the following test allows to prove that there are no gaps:
       stat.getMax()-stat.getMin()==messages.size()-1) {
        return;
    }
    
    if(stat.getMax()-stat.getMin()>Integer.MAX_VALUE) {
        // proceed with array based test
        …
    }
    else {
        long min = stat.getMin();
        // proceed with BitSet based test
        …
    

    【讨论】:

    • 感谢您提供非常完整的答案。是的,我知道我不应该将那些 lambda 表达式作为我的锤子,然后期望每个挑战都表现得像钉子 :) 问题是,这对我来说确实非常像钉子。但这可能是因为我对这把特殊锤子的实用性感到敬畏。尽管我在实际代码中采用了非 lambda 方法,但我还是会参考您的一些建议来学习。
    • “有时,尝试用 lambda 表达式做所有事情会使解决方案更加复杂。” 显然,问题是用一个 sorted() 和一个 reduce() 来解决 - @987654321 @
    【解决方案3】:

    怎么样:

            List<Long> offsets = messages.stream()
                    .sorted(comparingLong(Message::getOffset))
                    .map(Message::getOffset)
                    .collect(toList());
    
            IntStream.range(1, offsets.size())
                    .mapToObj(i -> new Pair<>(offsets.get(i - 1), offsets.get(i)))
                    .forEach(pair -> {
                        final long currentOffset = pair.getKey();
                        final long expectedNextOffset = pair.getKey() + 1;
                        final long actualNextOffset = pair.getValue();
                        if (actualNextOffset != expectedNextOffset) {
                            LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, actualNextOffset - 1);
                        }
                    });
    

    【讨论】:

      【解决方案4】:

      对于目前的问题,这种方式似乎更适合

      messages.stream().sorted( Comparator.comparingLong( Message::getOffset ) )
        .reduce( (m1, m2) -> {
          if( m1.getOffset() + 1 != m2.getOffset() )
            LOG.error( "Missing offset(s) found in messages: missing from {} to {}", m1.getOffset(), m2.getOffset() );
          return( m2 );
        } );
      

      此解决方案使用了reduce,而不是其预期用途。它仅使用 reduce 的能力来遍历流中的所有对。
      reduce 的结果使用。 (再进一步使用结果是不可能的,因为这需要一个可变的归约。)

      【讨论】:

      • 这是在滥用reduce 以获得副作用,而关于副作用,它违反了要求它是关联的合同。对于函数 f 和排序流 a b c,关联意味着它必须在评估时产生正确的结果,例如正如f(a, f(b, c)) 而不是你所期望的f(f(a, b), c)
      • 有趣的问题。我认为 != 是关联的。
      • 问题不是!=的使用。问题是您要返回第二个参数,因此在f(a, f(b, c)) 的情况下,您将比较ac 而不是ab。显然,返回第一个参数也行不通。要使用缩减来检查连续元素,您需要返回一个范围,由其第一个和最后一个元素表示,例如 map(element -&gt; Range.of(element, element)) .reduce((r1,r2) -&gt; Range.of(r1.first, r2.last))
      • 我知道你的意思。但这不仅仅是使用parallelStream 的问题吗——这绝对是不可能的? 顺便说一句:我根本不使用reduce 的结果。
      • 如果元素超过两个,您自己的归约函数将使用先前评估的结果。这就是我对f(a, f(b, c))f(f(a, b), c) 的意思。规范从来没有说你可以期望一个顺序流的特定评估顺序(这样的声明在发布之前已经被故意从文档中删除)。因此,如果它碰巧做了想要的事情,您就有了一个仅适用于顺序流的特定实现的解决方案。要求读者通过观察来理解这些限制,这使得它变得“容易”。
      【解决方案5】:

      为了学习 Java 8 api,您可以使用一个收集器,您可以在其中依次比较流的每个成员,并使用累加器类 BadPairs 来跟踪偏移序列中的任何间隙.

      为了帮助您理解供应商、累加器和组合器 lambda 之间的关系,我写得比实际需要的更详细。

      public class PairedStreamTest {
      
          private BiConsumer<BadPairs,BadPairs> combiner = (bad1,bad2) -> bad1.add(bad2);
      
          private Supplier<BadPairs> supplier = BadPairs::new;
      
          private BiConsumer<BadPairs,Message> accumulator = (bad,msg) -> bad.add(msg);
      
          @Test
          public void returnsTwoBadPairs_givenInputStreamIsMissingOffsets_forFourAndSix() throws Exception {
      
              BadPairs badPairs = Stream.of(new Message(1), new Message(2), new Message(3), new Message(5), new Message(7))
                      .sorted(comparingLong(Message::getOffset))
                      .collect(supplier, accumulator, combiner);
      
              badPairs.pairs.forEach(pair ->
                      LOG.error("Missing offset(s) found in messages: missing from {} to {}", pair.first.offset, pair.second.offset));
      
              assertTrue(badPairs.pairs.size() == 2);
          }
      
          // supporting classes for the above test code
      
          private final Logger LOG = LoggerFactory.getLogger(PairedStreamTest.class);
      
          class Message {
              public int offset;
              public Message(int i) {
                  this.offset = i;
              }
              public Integer getOffset() {
                  return this.offset;
              }
          }
      
          class Pair {
              private Message first;
              private Message second;
              public Pair(Message smaller, Message larger) {
                  this.first = smaller;
                  this.second = larger;
              }
          }
      
          class BadPairs {
              public Message previous;
              public Set<Pair> pairs = new HashSet<>();
              public void add(BadPairs other) {
                  this.pairs.addAll(other.pairs);
              }
              public void add(Message msg) {
                  if(previous != null && previous.offset != msg.offset-1) {
                      this.pairs.add(new Pair(previous, msg));
                  }
                  this.previous = msg;
              }
          }
      }
      

      请原谅公共成员变量的不当使用,以及这个 Test 类的布局。我的意图是首先让读者关注@Test 案例,而不是支持类。

      【讨论】:

      • 我担心我将不得不去做这样的事情,因为我想了解更多关于定制收集器的信息,所以我会尝试一下。感谢您的冗长! :) 然而,必须经历所有这些感觉非常笨拙,对于我的 productino 代码,我会坚持使用我猜的原始代码。
      【解决方案6】:

      怎么样:

      final List<Long> offsets = messages.stream().map(Message::getOffset).sorted().collect(toList());
      IntStream.range(0, offsets.size() - 1).forEach(i -> {
          long currentOffset = offsets.get(i);
          if (offsets.get(i + 1) != currentOffset + 1) {
              LOG.error("Missing offset(s) found in messages: missing from {} to {}", currentOffset + 1, offsets.get(i + 1) - 1);
          }
      });
      

      或者StreamEx的全部在一个声明中:

      StreamEx.of(messages).mapToLong(Message::getOffset).sorted().boxed()
                .pairMap((i, j) -> new long[] { i, j }).filter(a -> a[1] - a[0] > 1)
                .forEach(a -> LOG.error("Missing offset(s) found in messages: missing from {} to {}", a[0] + 1, a[1] - 1));
      

      或者AbacusUtil的全部在一个声明中:

      Stream.of(messages).mapToLong(Message::getOffset).sorted()
                .sliding0(2).filter(e -> e.size() == 2 && e.get(1) - e.get(0) > 1)
                .forEach(e -> LOG.error("Missing offset(s) found in messages: missing from {} to {}", e.get(0) + 1, e.get(1) - 1));
      

      【讨论】:

      • 感谢您提供的所有示例!我确信它们可以工作,但“本机”示例与现有代码几乎相同(只是另一种编写 for 循环的方式)。不过谢谢,我以前没见过。关于 StreamEx:我以前在谷歌上搜索时看到过,但我想学习 Java。 (不过我可能已经提到过。)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-08-24
      • 1970-01-01
      • 2017-07-12
      • 2021-06-25
      • 2020-07-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多