【问题标题】:parallel computation for an Iterator of elements in JavaJava中元素迭代器的并行计算
【发布时间】:2009-12-19 06:45:09
【问题描述】:

我现在有几次相同的需求,并想就构建解决方案的正确方法获得其他想法。需要在许多线程上对许多元素执行一些操作,而不需要一次将所有元素都放在内存中,只需要计算中的元素。例如,Iterables.partition 是不够的,因为它会将所有元素预先放入内存中。

在代码中表达它,我想编写一个 BulkCalc2,它与 BulkCalc1 做同样的事情,只是并行。下面是示例代码,说明了我的最佳尝试。我不满意,因为它又大又丑,但它似乎确实实现了我的目标,即在工作完成之前保持线程的高度利用率,propagating 计算期间的任何异常,并且不超过 numThreads BigThing 的实例必须立即在内存中。

我会以最简洁的方式接受符合既定目标的答案,无论是改进我的 BulkCalc2 的方法还是完全不同的解决方案。

interface BigThing {

    int getId();

    String getString();
}

class Calc {

    // somewhat expensive computation
    double calc(BigThing bigThing) {
        Random r = new Random(bigThing.getString().hashCode());
        double d = 0;
        for (int i = 0; i < 100000; i++) {
            d += r.nextDouble();
        }
        return d;
    }
}

class BulkCalc1 {

    final Calc calc;

    public BulkCalc1(Calc calc) {
        this.calc = calc;
    }

    public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
        TreeMap<Integer, Double> results = Maps.newTreeMap();
        while (in.hasNext()) {
            BigThing o = in.next();
            results.put(o.getId(), calc.calc(o));
        }
        return results;
    }
}

class SafeIterator<T> {

    final Iterator<T> in;

    SafeIterator(Iterator<T> in) {
        this.in = in;
    }

    synchronized T nextOrNull() {
        if (in.hasNext()) {
            return in.next();
        }
        return null;
    }
}

class BulkCalc2 {

    final Calc calc;
    final int numThreads;

    public BulkCalc2(Calc calc, int numThreads) {
        this.calc = calc;
        this.numThreads = numThreads;
    }

    public TreeMap<Integer, Double> calc(Iterator<BigThing> in) {
        ExecutorService e = Executors.newFixedThreadPool(numThreads);
        List<Future<?>> futures = Lists.newLinkedList();

        final Map<Integer, Double> results = new MapMaker().concurrencyLevel(numThreads).makeMap();
        final SafeIterator<BigThing> it = new SafeIterator<BigThing>(in);
        for (int i = 0; i < numThreads; i++) {
            futures.add(e.submit(new Runnable() {

                @Override
                public void run() {
                    while (true) {
                        BigThing o = it.nextOrNull();
                        if (o == null) {
                            return;
                        }
                        results.put(o.getId(), calc.calc(o));
                    }
                }
            }));
        }

        e.shutdown();

        for (Future<?> future : futures) {
            try {
                future.get();
            } catch (InterruptedException ex) {
                // swallowing is OK
            } catch (ExecutionException ex) {
                throw Throwables.propagate(ex.getCause());
            }
        }

        return new TreeMap<Integer, Double>(results);
    }
}

【问题讨论】:

  • 我要告诉你一件事:TreeMap 在大多数用途中是 Java 集合中最慢的,应该只要尽可能地替换为其他集合之一。如果我能找到时间,我会尝试解决这个问题——它很复杂,我也遇到过类似的问题,所以你有我的同情。良好的并行编码很难,即使在 Java 中也是如此。

标签: java iterator parallel-processing


【解决方案1】:

而且,简洁的方式:(速度较慢,没有那么健壮或干净,但还算可以)

33 行计算,全部在一种方法中。由于不需要同步,效率不高,并且(与上述不同)它在处理中的每个异常时都会丢失一个线程(并且必须创建一个新线程)。我发布的上一个只是将所有异常收集到一个整洁的包中以供以后处理。如果有时发生异常,这会提高性能,因为创建线程的成本适中。

/** More succinct */
public static Map<Integer, Double> bulkCalcSuccincter(final Iterator<BigThing> it, final Calc calc, final int numThreads) {
    final ConcurrentHashMap<Integer, Double> results = new ConcurrentHashMap<Integer, Double>();
    final java.util.List<Future> futures = new ArrayList<Future>();
    final ExecutorService e = Executors.newFixedThreadPool(numThreads);

    for (int i = 0; i < numThreads; i++) {
        futures.add(e.submit(new Runnable() {
            public void run() {
                while (true) {
                    BigThing thing = null;
                    synchronized (it) {
                        thing = (it.hasNext()) ? it.next() : null;
                    }
                    if (thing == null) {
                        break;
                    }
                    results.put(thing.getId(), calc.calc(thing));
                }
            }
        }));
    }
    e.shutdown();

    for (Future f : futures) {
        try {
            f.get();
        } catch (InterruptedException ex) {
        // swallowing is better than spitting it out
        } catch (ExecutionException ex) {
            throw Throwables.propagate(ex.getCause());
        }
    }
    return results;
}

【讨论】:

    【解决方案2】:

    【讨论】:

    【解决方案3】:

    编辑:修改后的更快版本

    注意事项: 这实际上不太简洁,但应该运行得更快。要在迭代器上运行,您调用静态方法 BulkCalcRunner.runBulkCalc(Iterator,Calc) 或指定线程数。干净,相当简洁,并且可能是您可以获得的最快的解决方案。

    速度更快的原因:

    • 结果收集在线程本地 HashMap 中——收集它们不需要同步。否则,存储每个结果都需要同步。这提高了每个线程的扩展能力,并提供了更好的引用局部性(您的 HashMap 可以完全存在于每个处理器的 L2 缓存中,无需通信)。
    • HashMap 用于代替效率较低的 Map 集合
    • 错误被捆绑到一个集合中以供以后处理。使用线程池,每个异常都需要一个线程死掉并重新创建

      接口 BigThing { int getId(); 字符串 getString(); }

      class Calc {
          // somewhat expensive computation
          double calc(BigThing bigThing) {
              Random r = new Random(bigThing.getString().hashCode());
              double d = 0;
              for (int i = 0; i < 100000; i++) {
                  d += r.nextDouble();
              }
              return d;
          }
      }
      
      static class BulkCalcRunner implements Runnable {
          Calc calc;
          CountDownLatch latch;
          Iterator<BigThing> it;
          Collection<Throwable> errors;
          Map<Integer,Double> results;
      
          public BulkCalcRunner (Calc calc, Iterator<BigThing> it, CountDownLatch latch, Map<Integer,Double> results, Collection<Throwable> errors) {
              this.calc = calc;
              this.latch = latch;
              this.errors = errors;
              this.results = results;
          }
      
          public void run() {
              ArrayList<Throwable> errorLocal = new ArrayList<Throwable>();
              HashMap<Integer,Double> resultsLocal = new HashMap<Integer,Double>();
              while (true) {
                  BigThing thing = null;
                  try {
                      synchronized (it) {
                          if (it.hasNext()) {
                              thing = it.next();
                          }
                      }
                  } catch (Exception e) { //prevents iterator errors from causing endless loop
                      thing = null;
                  }
                  //finished when first null BigThing encountered
                  if (thing == null) {
                      synchronized (errors) {
                          errors.addAll(errorLocal);
                      }
                      synchronized(results) {
                          results.putAll(resultsLocal);
                      }
                      latch.countDown();
                      break;
                  }
                  try {
                      resultsLocal.put(thing.getId(), calc.calc(thing));
                  } catch (Exception e) {
                      errorLocal.add(e);
                  }
              }
          }
      
          public static Map<Integer,Double> runBulkCalc(Iterator<BigThing> iterator, Calc calculation, int numThreads) {
              final ConcurrentHashMap<Integer, Double> results = new ConcurrentHashMap<Integer, Double>();
              final ArrayList<Throwable> errors = new ArrayList<Throwable>();
              final CountDownLatch latch = new CountDownLatch(numThreads);
      
              //start up the worker threads
              for (int i = 0; i < numThreads; i++) {
                  new Thread(new BulkCalcRunner(calculation,iterator,latch, results, errors)).start();
              }
      
              try {
                  //Latch waits for all the worker threads to check in as "done"
                  latch.await();
              } catch (InterruptedException ex) {
                  // swallowing is better than spitting it out...
              }
      
              //finally, propagate errors!
              for (Throwable th : errors) {
                  throw Throwables.propagate(th.getCause());
              }
              return results;
          }
      
          public static Map<Integer,Double> runBulkCalc(Iterator<BigThing> iterator, Calc calculation) {
              return runBulkCalc(iterator,calculation,Runtime.getRuntime().availableProcessors());
          }
      }
      

    【讨论】:

    • BulkCalcAwesome.calc 在我运行时抛出此异常: java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1760) 线程“main”中的异常 java.util.concurrent.RejectedExecutionException ) 在 java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) 在 java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) 在 java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java: 78)
    • 哦,混蛋。我期待它会阻止提交,直到队列空间打开。让我明天尝试另一种方法。顺便说一句,你能谈谈迭代器是如何工作的吗?如果它是从延迟加载实体或其他东西的集合中提取的,可能会有更好的方法......
    • 对 BulkCalc 的迭代器一无所知。它可能是也可能不是线程安全或延迟加载。
    • 我喜欢在这种编辑方法中使用 CountDownLatch!正如您所说,这不是最简洁的解决方案,但它似乎确实达到了既定目标。我怀疑它是否像您声称的那样快得多,但也许一个微基准可以证明?感谢提交!
    • 谢谢。如果你想感谢你的工作,你可以投票吗?另外,我应该澄清“更快”。 Calc 的工作不能更快地运行(duh),但是这里的线程部分最大限度地减少了在同步块中花费的时间,并最大限度地减少了获取锁的次数(每个线程只有几次错误,并且在迭代器上一次)。这转化为更有效的并行化。我将发布第二个版本,它更简洁,但展示的同步效率较低。
    【解决方案4】:

    虽然我想不出改进设计的方法,但至少我们可以将通用组件提取到实用程序类中。拉出线程代码,BulkCalc3 已经足够简洁了。

    class BulkCalc3
    {
        final Calc calc;
    
        public BulkCalc3(Calc calc)
        {
            this.calc = calc;
        }
    
        public TreeMap<Integer, Double> calc(Iterator<BigThing> in)
        {
            final ConcurrentMap<Integer, Double> resultMap = new MapMaker().makeMap();
            ThreadedIteratorProcessor<BigThing> processor = new ThreadedIteratorProcessor<BigThing>();
            processor.processIterator(in, new ThreadedIteratorProcessor.ElementProcessor<BigThing>()
            {
                @Override
                public void processElement(BigThing o)
                {
                    resultMap.put(o.getId(), calc.calc(o));
                }
            });
            return new TreeMap<Integer, Double>(resultMap);
        }
    }
    

    这是实用程序类:

    import com.google.common.collect.Lists;
    import com.google.common.util.concurrent.MoreExecutors;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    /**
     * A utility class to process each element in an iterator in an efficient manner.
     */
    public class ThreadedIteratorProcessor<T>
    {
        public static interface ElementProcessor<T>
        {
            /**
             * Process an element.
             * @param element The element to process.
             */
            public void processElement(T element);
        }
        private final int numThreads;
    
        /**
         * Create an instance which uses a specified number of threads.
         * @param numThreads The number of processing threads.
         */
        public ThreadedIteratorProcessor(int numThreads)
        {
            this.numThreads = numThreads;
        }
    
        /**
         * Create an instance which uses a number of threads equal to the number of system processors.
         */
        public ThreadedIteratorProcessor()
        {
            this(Runtime.getRuntime().availableProcessors());
        }
    
        /**
         * Process each element in an iterator in parallel.  The number of worker threads depends on how this object was
         * constructed.  This method will re-throw any exception thrown in the supplied ElementProcessor.  An element will
         * not be requested from the iterator any earlier than is absolutely necessary.  In other words, the last element in
         * the iterator will not be consumed until all of the other elements are completely processed, excluding elements
         * currently being processed by the worker threads.
         * @param iterator The iterator from which to get elements.  This iterator need not be thread-safe.
         * @param elementProcessor The element processor.
         */
        public void processIterator(Iterator<T> iterator, ElementProcessor<T> elementProcessor)
        {
            // Use an ExecutorService for proper exception handling.
            ExecutorService e = Executors.newFixedThreadPool(numThreads, MoreExecutors.daemonThreadFactory());
            List<Future<?>> futures = Lists.newLinkedList();
    
            // Get a thread-safe iterator
            final SafeIterator<T> safeIterator = new SafeIterator<T>(iterator);
    
            // Submit numThreads new worker threads to pull work from the iterator.
            for (int i = 0; i < numThreads; i++)
            {
                futures.add(e.submit(new Consumer<T>(safeIterator, elementProcessor)));
            }
    
            e.shutdown();
    
            // Calling .get() on the futures accomplishes two things:
            // 1. awaiting completion of the work
            // 2. discovering an exception during calculation, and rethrowing to the client in this thread.
            for (Future<?> future : futures)
            {
                try
                {
                    future.get();
                }
                catch (InterruptedException ex)
                {
                    // swallowing is OK
                }
                catch (ExecutionException ex)
                {
                    // Re-throw the underlying exception to the client.
                    throw Throwables.propagate(ex.getCause());
                }
            }
        }
    
        // A runnable that sits in a loop consuming and processing elements from an iterator.
        private static class Consumer<T> implements Runnable
        {
            private final SafeIterator<T> it;
            private final ElementProcessor<T> elementProcessor;
    
            public Consumer(SafeIterator<T> it, ElementProcessor<T> elementProcessor)
            {
                this.it = it;
                this.elementProcessor = elementProcessor;
            }
    
            @Override
            public void run()
            {
                while (true)
                {
                    T o = it.nextOrNull();
                    if (o == null)
                    {
                        return;
                    }
                    elementProcessor.processElement(o);
                }
            }
        }
    
        // a thread-safe iterator-like object.
        private static class SafeIterator<T>
        {
            private final Iterator<T> in;
    
            SafeIterator(Iterator<T> in)
            {
                this.in = in;
            }
    
            synchronized T nextOrNull()
            {
                if (in.hasNext())
                {
                    return in.next();
                }
                return null;
            }
        }
    }
    

    【讨论】:

    • 如果您在提交和接受自己的结果之前等待我的结果(见下文),我将不胜感激。我认为我的结果更干净、更不优雅,而且我可以保证它更快。
    • 糟糕,希望我没有气馁!我相信这个网站允许我在新答案出现时更改接受的答案。看到我收到有关新答案和 cmets 的通知,我将能够在提交时评估每个提交。够公平吗?让我们在您提交的 cmets 中继续讨论您提交的相关内容。
    • 我不认为你可以切换你接受的答案,除非你做的很快(它会让你打开/关闭它可能一两个小时)。但是,您可以向上/向下投票...
    • 我应该更清楚一点:生产者-消费者模式很有用,但你并不真的需要额外的类,除非你重用实例或扩展类。我认为合乎逻辑的、简洁的方法是一个单一的函数,它采用Iterator&lt;BigThing&gt;Calc 类型,以及可选的线程数。如果您要通过多个处理阶段进行管道传输,则可能值得保留 SafeIterator,但其他一些接口等可能不会被使用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-11-12
    • 2016-07-19
    • 2017-09-10
    • 2017-12-13
    • 2014-11-10
    • 2010-11-16
    • 2011-03-24
    相关资源
    最近更新 更多