【问题标题】:Is there a good way to extract chunks of data from a java 8 stream?有没有一种从 java 8 流中提取数据块的好方法?
【发布时间】:2014-10-14 00:36:33
【问题描述】:

我是一个 ETL 流程,我正在从 Spring Data Repository 中检索大量实体。然后我使用并行流将实体映射到不同的实体。 我可以使用消费者将这些新实体一个一个存储在另一个存储库中,也可以将它们收集到一个列表中并将其存储在一个批量操作中。 第一个成本很高,而后者可能会超出可用内存。

有没有一种好方法可以在流中收集一定数量的元素(如 limit 那样)、消耗该块并继续并行处理直到所有元素都被处理?

【问题讨论】:

标签: parallel-processing java-8 java-stream


【解决方案1】:

我使用分块进行批量操作的方法是使用分区拆分器包装器,以及另一个将默认拆分策略(批量大小以 1024 为增量的算术级数)覆盖为简单的固定批量拆分的包装器。像这样使用它:

Stream<OriginalType> existingStream = ...;
Stream<List<OriginalType>> partitioned = partition(existingStream, 100, 1);
partitioned.forEach(chunk -> ... process the chunk ...);

这里是完整的代码:

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class PartitioningSpliterator<E> extends AbstractSpliterator<List<E>>
{
  private final Spliterator<E> spliterator;
  private final int partitionSize;

  public PartitioningSpliterator(Spliterator<E> toWrap, int partitionSize) {
    super(toWrap.estimateSize(), toWrap.characteristics() | Spliterator.NONNULL);
    if (partitionSize <= 0) throw new IllegalArgumentException(
        "Partition size must be positive, but was " + partitionSize);
    this.spliterator = toWrap;
    this.partitionSize = partitionSize;
  }

  public static <E> Stream<List<E>> partition(Stream<E> in, int size) {
    return StreamSupport.stream(new PartitioningSpliterator(in.spliterator(), size), false);
  }

  public static <E> Stream<List<E>> partition(Stream<E> in, int size, int batchSize) {
    return StreamSupport.stream(
        new FixedBatchSpliterator<>(new PartitioningSpliterator<>(in.spliterator(), size), batchSize), false);
  }

  @Override public boolean tryAdvance(Consumer<? super List<E>> action) {
    final ArrayList<E> partition = new ArrayList<>(partitionSize);
    while (spliterator.tryAdvance(partition::add) 
           && partition.size() < partitionSize);
    if (partition.isEmpty()) return false;
    action.accept(partition);
    return true;
  }

  @Override public long estimateSize() {
    final long est = spliterator.estimateSize();
    return est == Long.MAX_VALUE? est
         : est / partitionSize + (est % partitionSize > 0? 1 : 0);
  }
}

import static java.util.Spliterators.spliterator;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
    characteristics |= ORDERED;
    if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
    this.characteristics = characteristics;
    this.batchSize = batchSize;
    this.est = est;
  }
  public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
    this(characteristics, batchSize, Long.MAX_VALUE);
  }
  public FixedBatchSpliteratorBase(int characteristics) {
    this(characteristics, 64, Long.MAX_VALUE);
  }

  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}

import static java.util.stream.StreamSupport.stream;

import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliterator<T> extends FixedBatchSpliteratorBase<T> {
  private final Spliterator<T> spliterator;

  public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize, long est) {
    super(toWrap.characteristics(), batchSize, est);
    this.spliterator = toWrap;
  }
  public FixedBatchSpliterator(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, batchSize, toWrap.estimateSize());
  }
  public FixedBatchSpliterator(Spliterator<T> toWrap) {
    this(toWrap, 64, toWrap.estimateSize());
  }

  public static <T> Stream<T> withBatchSize(Stream<T> in, int batchSize) {
    return stream(new FixedBatchSpliterator<>(in.spliterator(), batchSize), true);
  }

  public static <T> FixedBatchSpliterator<T> batchedSpliterator(Spliterator<T> toWrap, int batchSize) {
    return new FixedBatchSpliterator<>(toWrap, batchSize);
  }

  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
}

【讨论】:

  • 不应该在FixedBatchSpliterator 中定义getComparator() 并委托,即:return spliterator.getComparator(); ?
  • 可能。 getComparator() API 对我来说仍然有些神秘。
  • FixedBatchSpliteratorPartitioningSpliterator 有什么区别?我没有收到partition(existingStream, 100, 1)
  • @Titmael FixedBatchSpliterator 是关于交付给 ecah 工作线程的非语义批处理数据。此批处理的效果对用户提供的 lambda 不可见。另一方面,PartitioningSpliterator 会生成显式传递给您的 lambda 的数据块。
  • 在我的情况下,我需要将流分成 500 个项目的块,我只能使用 PartitioningSpliterator#partition(Stream&lt;E&gt; in, int size) ?
【解决方案2】:

您也许可以编写自己的 Collector 来累积实体,然后执行批量更新。

Collector.accumulator() 方法可以将实体添加到内部临时缓存中,直到缓存变得太大。当缓存足够大时,您可以批量存储到其他存储库中。

Collector.merge() 需要将 2 个线程的 Collector 的缓存合并到一个缓存中(并可能合并)

最后,Collector.finisher() 方法会在 Stream 完成时被调用,因此也将缓存中剩下的任何内容存储在这里。

由于您已经在使用并行流并且似乎可以同时执行多个加载,我假设您已经处理了线程安全。

更新

我关于线程安全和并行流的评论是指实际保存/存储到存储库中,而不是临时集合中的并发。

每个收集器都应该(我认为)在自己的线程中运行。并行流应该通过多次调用supplier() 创建多个收集器实例。因此,您可以将收集器实例视为单线程,它应该可以正常工作。

例如在 java.util.IntSummaryStatistics 的 Javadoc 中它说:

此实现不是线程安全的。但是,在并行流上使用 Collectors.toIntStatistics() 是安全的,因为 Stream.collect() 的并行实现为安全高效的并行执行提供了必要的分区、隔离和结果合并。

【讨论】:

  • 当使用 peek() 或 Collector.accumulator() 填充并发保存集合时,我无法安全地确定我的缓存何时达到 1000 个条目。我必须锁定集合,计数,检索所有条目(如果填充到我想要的级别),然后再次释放集合。这将杀死并行性。我曾希望有一种的方法可以隐藏在流 API 中的某个地方...
  • @ChristophGrimmer-Dietrich 我不确定您是否需要为此担心。每个收集器应该(我认为)在自己的线程中运行。并行流应该通过多次调用supplier() 创建多个收集器实例。我会更新我的答案
【解决方案3】:

您可以使用自定义收集器优雅地完成此操作。

请在此处查看我对类似问题的回答:

Custom batch processing collector

然后,您可以简单地使用上述收集器并行批处理流,以将记录存储回您的存储库,示例用法:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> repository.save(xs);

input.parallelStream()
     .map(i -> i + 1)
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

【讨论】:

    【解决方案4】:
      @Test
    public void streamTest(){
    
        Stream<Integer> data = Stream.generate(() -> {
            //Block on IO
            return blockOnIO();
        });
    
    
        AtomicInteger countDown = new AtomicInteger(1000);
        final ArrayList[] buffer = new ArrayList[]{new ArrayList<Integer>()};
        Object syncO = new Object();
        data.parallel().unordered().map(i -> i * 1000).forEach(i->{
            System.out.println(String.format("FE %s %d",Thread.currentThread().getName(), buffer[0].size()));
            int c;
            ArrayList<Integer> export=null;
            synchronized (syncO) {
                c = countDown.addAndGet(-1);
                buffer[0].add(i);
                if (c == 0) {
                    export=buffer[0];
                    buffer[0] = new ArrayList<Integer>();
                    countDown.set(1000);
                }
            }
            if(export !=null){
                sendBatch(export);
            }
    
        });
        //export any remaining
        sendBatch(buffer[0]);
    }
    
    Integer blockOnIO(){
        try {
            Thread.sleep(50);
            return Integer.valueOf((int)Math.random()*1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
    void sendBatch(ArrayList al){
        assert al.size() == 1000;
        System.out.println(String.format("LOAD %s %d",Thread.currentThread().getName(), al.size()));
    }
    

    这可能有点过时,但应该以最少的锁定来实现批处理。

    它将产生输出

    FE ForkJoinPool.commonPool-worker-2 996
    FE ForkJoinPool.commonPool-worker-5 996
    FE ForkJoinPool.commonPool-worker-4 998
    FE ForkJoinPool.commonPool-worker-3 999
    LOAD ForkJoinPool.commonPool-worker-3 1000
    FE ForkJoinPool.commonPool-worker-6 0
    FE ForkJoinPool.commonPool-worker-1 2
    FE ForkJoinPool.commonPool-worker-7 2
    FE ForkJoinPool.commonPool-worker-2 4
    

    【讨论】:

      【解决方案5】:

      这是我的图书馆的解决方案:AbacusUtil:

      stream.split(batchSize).parallel(threadNum).map(yourBatchProcessFunction);
      

      【讨论】:

        猜你喜欢
        • 2015-10-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-02-05
        • 2013-06-18
        • 1970-01-01
        • 2012-10-27
        相关资源
        最近更新 更多