【问题标题】:How to wait for a number of threads to complete?如何等待多个线程完成?
【发布时间】:2010-11-18 03:22:57
【问题描述】:

有什么方法可以简单地等待所有线程进程完成?例如,假设我有:

public class DoSomethingInAThread implements Runnable{

    public static void main(String[] args) {
        for (int n=0; n<1000; n++) {
            Thread t = new Thread(new DoSomethingInAThread());
            t.start();
        }
        // wait for all threads' run() methods to complete before continuing
    }

    public void run() {
        // do something here
    }


}

如何更改此设置,以便 main() 方法在注释处暂停,直到所有线程的 run() 方法退出?谢谢!

【问题讨论】:

    标签: java multithreading parallel-processing wait


    【解决方案1】:

    问题:

    for(i = 0; i < threads.length; i++)
      threads[i].join();
    

    ...是,threads[i + 1] 永远不能在threads[i] 之前加入。 除了“闩锁”的解决方案之外,所有解决方案都缺乏这个。

    这里(还没有)提到ExecutorCompletionService,它允许根据完成顺序加入线程/任务:

    public class ExecutorCompletionService&lt;V&gt; extends Object implements CompletionService&lt;V&gt;

    使用提供的Executor 执行任务的CompletionService。此类安排提交的任务在完成后放置在使用 take 可访问的队列中。该类足够轻量级,适合在处理任务组时临时使用。

    用法示例。

    假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,以某种方法处理每个返回非空值的结果use(Result r)。你可以这样写:

    void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
      CompletionService<Result> cs = new ExecutorCompletionService<>(e);
      solvers.forEach(cs::submit);
      for (int i = solvers.size(); i > 0; i--) {
        Result r = cs.take().get();
        if (r != null)
          use(r);
      }
    }
    

    假设您想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务:

    void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
      CompletionService<Result> cs = new ExecutorCompletionService<>(e);
      int n = solvers.size();
      List<Future<Result>> futures = new ArrayList<>(n);
      Result result = null;
      try {
        solvers.forEach(solver -> futures.add(cs.submit(solver)));
        for (int i = n; i > 0; i--) {
          try {
            Result r = cs.take().get();
            if (r != null) {
              result = r;
              break;
            }
          } catch (ExecutionException ignore) {}
        }
      } finally {
        futures.forEach(future -> future.cancel(true));
      }
    
      if (result != null)
        use(result);
    }
    

    自:1.5 (!)

    假设use(r)(示例 1)也是异步的,我们有很大的优势。 #

    【讨论】:

      【解决方案2】:

      join() 对我没有帮助。在 Kotlin 中查看此示例:

          val timeInMillis = System.currentTimeMillis()
          ThreadUtils.startNewThread(Runnable {
              for (i in 1..5) {
                  val t = Thread(Runnable {
                      Thread.sleep(50)
                      var a = i
                      kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                      Thread.sleep(200)
                      for (j in 1..5) {
                          a *= j
                          Thread.sleep(100)
                          kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                      }
                      kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
                  })
                  t.start()
              }
          })
      

      结果:

      Thread-5|a=5
      Thread-1|a=1
      Thread-3|a=3
      Thread-2|a=2
      Thread-4|a=4
      Thread-2|2*1=2
      Thread-3|3*1=3
      Thread-1|1*1=1
      Thread-5|5*1=5
      Thread-4|4*1=4
      Thread-1|2*2=2
      Thread-5|10*2=10
      Thread-3|6*2=6
      Thread-4|8*2=8
      Thread-2|4*2=4
      Thread-3|18*3=18
      Thread-1|6*3=6
      Thread-5|30*3=30
      Thread-2|12*3=12
      Thread-4|24*3=24
      Thread-4|96*4=96
      Thread-2|48*4=48
      Thread-5|120*4=120
      Thread-1|24*4=24
      Thread-3|72*4=72
      Thread-5|600*5=600
      Thread-4|480*5=480
      Thread-3|360*5=360
      Thread-1|120*5=120
      Thread-2|240*5=240
      Thread-1|TaskDurationInMillis = 765
      Thread-3|TaskDurationInMillis = 765
      Thread-4|TaskDurationInMillis = 765
      Thread-5|TaskDurationInMillis = 765
      Thread-2|TaskDurationInMillis = 765
      

      现在让我将join() 用于线程:

          val timeInMillis = System.currentTimeMillis()
          ThreadUtils.startNewThread(Runnable {
              for (i in 1..5) {
                  val t = Thread(Runnable {
                      Thread.sleep(50)
                      var a = i
                      kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                      Thread.sleep(200)
                      for (j in 1..5) {
                          a *= j
                          Thread.sleep(100)
                          kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                      }
                      kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
                  })
                  t.start()
                  t.join()
              }
          })
      

      结果:

      Thread-1|a=1
      Thread-1|1*1=1
      Thread-1|2*2=2
      Thread-1|6*3=6
      Thread-1|24*4=24
      Thread-1|120*5=120
      Thread-1|TaskDurationInMillis = 815
      Thread-2|a=2
      Thread-2|2*1=2
      Thread-2|4*2=4
      Thread-2|12*3=12
      Thread-2|48*4=48
      Thread-2|240*5=240
      Thread-2|TaskDurationInMillis = 1568
      Thread-3|a=3
      Thread-3|3*1=3
      Thread-3|6*2=6
      Thread-3|18*3=18
      Thread-3|72*4=72
      Thread-3|360*5=360
      Thread-3|TaskDurationInMillis = 2323
      Thread-4|a=4
      Thread-4|4*1=4
      Thread-4|8*2=8
      Thread-4|24*3=24
      Thread-4|96*4=96
      Thread-4|480*5=480
      Thread-4|TaskDurationInMillis = 3078
      Thread-5|a=5
      Thread-5|5*1=5
      Thread-5|10*2=10
      Thread-5|30*3=30
      Thread-5|120*4=120
      Thread-5|600*5=600
      Thread-5|TaskDurationInMillis = 3833
      

      当我们使用join 时很明显:

      1. 线程按顺序运行。
      2. 第一个样本需要 765 毫秒,而第二个样本需要 3833 毫秒。

      我们防止阻塞其他线程的解决方案是创建一个 ArrayList:

      val threads = ArrayList<Thread>()
      

      现在,当我们想要启动一个新线程时,我们最常将其添加到 ArrayList 中:

      addThreadToArray(
          ThreadUtils.startNewThread(Runnable {
              ...
          })
      )
      

      addThreadToArray 函数:

      @Synchronized
      fun addThreadToArray(th: Thread) {
          threads.add(th)
      }
      

      startNewThread 功能:

      fun startNewThread(runnable: Runnable) : Thread {
          val th = Thread(runnable)
          th.isDaemon = false
          th.priority = Thread.MAX_PRIORITY
          th.start()
          return th
      }
      

      检查线程的完成情况,如下所示:

      val notAliveThreads = ArrayList<Thread>()
      for (t in threads)
          if (!t.isAlive)
              notAliveThreads.add(t)
      threads.removeAll(notAliveThreads)
      if (threads.size == 0){
          // The size is 0 -> there is no alive threads.
      }
      

      【讨论】:

        【解决方案3】:

        作为 CountDownLatch 的替代品,您还可以使用 CyclicBarrier 例如

        public class ThreadWaitEx {
            static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
                public void run(){
                    System.out.println("clean up job after all tasks are done.");
                }
            });
            public static void main(String[] args) {
                for (int i = 0; i < 100; i++) {
                    Thread t = new Thread(new MyCallable(barrier));
                    t.start();
                }       
            }
        
        }    
        
        class MyCallable implements Runnable{
            private CyclicBarrier b = null;
            public MyCallable(CyclicBarrier b){
                this.b = b;
            }
            @Override
            public void run(){
                try {
                    //do something
                    System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
                    b.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }       
        }
        

        在这种情况下使用 CyclicBarrier,barrier.await() 应该是最后一个语句,即当你的线程完成它的工作时。 CyclicBarrier 可以通过其 reset() 方法再次使用。引用 javadocs:

        CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在队伍中的最后一个线程到达之后,但在任何线程被释放之前。此屏障操作对于在任何一方继续之前更新共享状态很有用。

        【讨论】:

        • 我认为这不是 CyclicBarrier 的好例子。为什么要使用 Thread.sleep() 调用?
        • @Guenther - 是的,我更改了代码以满足要求。
        • CyclicBarrier 不是 CountDownLatch 的替代品。当线程必须重复倒计时时,您应该创建一个 CyclicBarrier,否则默认为 CountDownLatch(除非需要额外的 Execution 抽象,此时您应该查看更高级别的 Services)。
        【解决方案4】:

        【讨论】:

        • 不确定你到底打算怎么做。如果您建议在循环中轮询 activeCount:这很糟糕,因为它是忙碌等待(即使您在轮询之间睡觉 - 您也会在业务和响应能力之间进行权衡)。
        • @Martin v. Löwis:“加入将只等待一个线程。更好的解决方案可能是 java.util.concurrent.CountDownLatch。只需将锁存器的计数设置为工作线程。每个工作线程在退出之前都应该调用countDown(),而主线程只是调用await(),这将阻塞直到计数器达到零。join()的问题也是你不能开始添加更多线程动态。列表将随着并发修改而爆炸。您的解决方案适用于问题,但不适用于一般目的。
        【解决方案5】:

        正如 Martin K 建议的那样,java.util.concurrent.CountDownLatch 似乎是一个更好的解决方案。只需添加一个相同的示例

             public class CountDownLatchDemo
        {
        
            public static void main (String[] args)
            {
                int noOfThreads = 5;
                // Declare the count down latch based on the number of threads you need
                // to wait on
                final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
                for (int i = 0; i < noOfThreads; i++)
                {
                    new Thread()
                    {
        
                        @Override
                        public void run ()
                        {
        
                            System.out.println("I am executed by :" + Thread.currentThread().getName());
                            try
                            {
                                // Dummy sleep
                                Thread.sleep(3000);
                                // One thread has completed its job
                                executionCompleted.countDown();
                            }
                            catch (InterruptedException e)
                            {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
        
                    }.start();
                }
        
                try
                {
                    // Wait till the count down latch opens.In the given case till five
                    // times countDown method is invoked
                    executionCompleted.await();
                    System.out.println("All over");
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        
        }
        

        【讨论】:

          【解决方案6】:

          如果你创建一个线程列表,你可以遍历它们并针对每个线程使用 .join() ,当所有线程都有时,你的循环就会结束。不过我没试过。

          http://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join()

          【讨论】:

          【解决方案7】:

          一种方法是创建Threads 的List,创建并启动每个线程,同时将其添加到列表中。启动所有内容后,循环返回列表并在每个列表上调用join()。线程完成执行的顺序无关紧要,您需要知道的是,当第二个循环完成执行时,每个线程都将完成。

          更好的方法是使用ExecutorService 及其相关方法:

          List<Callable> callables = ... // assemble list of Callables here
                                         // Like Runnable but can return a value
          ExecutorService execSvc = Executors.newCachedThreadPool();
          List<Future<?>> results = execSvc.invokeAll(callables);
          // Note: You may not care about the return values, in which case don't
          //       bother saving them
          

          使用 ExecutorService(以及来自 Java 5 的 concurrency utilities 的所有新东西)非常灵活,上面的示例几乎没有触及表面。

          【讨论】:

          • 线程组是要走的路!使用可变列表你会遇到麻烦(同步)
          • 什么?你怎么会惹上麻烦?它仅由正在启动的线程可变(仅可读),因此只要它不修改列表同时遍历它,就可以了。
          • 这取决于你如何使用它。如果你在线程中使用调用类,你会遇到问题。
          【解决方案8】:

          您可以使用CountDownLatch 而不是join(),这是一个旧的API。我已将您的代码修改如下以满足您的要求。

          import java.util.concurrent.*;
          class DoSomethingInAThread implements Runnable{
              CountDownLatch latch;
              public DoSomethingInAThread(CountDownLatch latch){
                  this.latch = latch;
              } 
              public void run() {
                  try{
                      System.out.println("Do some thing");
                      latch.countDown();
                  }catch(Exception err){
                      err.printStackTrace();
                  }
              }
          }
          
          public class CountDownLatchDemo {
              public static void main(String[] args) {
                  try{
                      CountDownLatch latch = new CountDownLatch(1000);
                      for (int n=0; n<1000; n++) {
                          Thread t = new Thread(new DoSomethingInAThread(latch));
                          t.start();
                      }
                      latch.await();
                      System.out.println("In Main thread after completion of 1000 threads");
                  }catch(Exception err){
                      err.printStackTrace();
                  }
              }
          }
          

          解释

          1. CountDownLatch 已根据您的要求使用给定计数 1000 进行初始化。

          2. 每个工作线程DoSomethingInAThread都会递减CountDownLatch,它已经传入构造函数。

          3. 主线程 CountDownLatchDemo await() 直到计数变为零。一旦计数变为零,您将得到低于线的输出。

            In Main thread after completion of 1000 threads
            

          来自 oracle 文档页面的更多信息

          public void await()
                     throws InterruptedException
          

          导致当前线程等待直到锁存器倒计时到零,除非线程被中断。

          有关其他选项,请参阅相关的 SE 问题:

          wait until all threads finish their work in java

          【讨论】:

            【解决方案9】:

            考虑使用java.util.concurrent.CountDownLatchjavadocs中的示例

            【讨论】:

            • 是线程的锁存器,锁存器锁与倒计时一起工作。在线程的 run() 方法中明确声明等待 CountDownLatch 达到它的倒计时到 0。您可以在多个线程中使用相同的 CountDownLatch 来同时释放它们。不知道是不是你需要的,只想提一下,因为在多线程环境下工作时很有用。
            • 也许你应该把这个解释放在你的答案正文中?
            • Javadoc 中的示例非常具有描述性,因此我没有添加任何示例。 docs.oracle.com/javase/7/docs/api/java/util/concurrent/…。在第一个示例中,所有 Workers 线程同时释放,因为它们等待 CountdownLatch startSignal 达到零,这发生在 startSignal.countDown() 中。然后,mian 线程使用 doneSignal.await() 指令等待所有工作完成。 doneSignal 在每个工作人员中降低其值。
            【解决方案10】:

            在第一个 for 循环中创建线程对象。

            for (int i = 0; i < threads.length; i++) {
                 threads[i] = new Thread(new Runnable() {
                     public void run() {
                         // some code to run in parallel
                     }
                 });
                 threads[i].start();
             }
            

            然后这里的每个人都在说什么。

            for(i = 0; i < threads.length; i++)
              threads[i].join();
            

            【讨论】:

              【解决方案11】:
              import java.util.ArrayList;
              import java.util.List;
              import java.util.concurrent.ExecutionException;
              import java.util.concurrent.ExecutorService;
              import java.util.concurrent.Executors;
              import java.util.concurrent.Future;
              
              public class DoSomethingInAThread implements Runnable
              {
                 public static void main(String[] args) throws ExecutionException, InterruptedException
                 {
                    //limit the number of actual threads
                    int poolSize = 10;
                    ExecutorService service = Executors.newFixedThreadPool(poolSize);
                    List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();
              
                    for (int n = 0; n < 1000; n++)
                    {
                       Future f = service.submit(new DoSomethingInAThread());
                       futures.add(f);
                    }
              
                    // wait for all tasks to complete before continuing
                    for (Future<Runnable> f : futures)
                    {
                       f.get();
                    }
              
                    //shut down the executor service so that this thread can exit
                    service.shutdownNow();
                 }
              
                 public void run()
                 {
                    // do something here
                 }
              }
              

              【讨论】:

              • 工作就像一个魅力......我有两组线程,由于多个 cookie 的问题,它们不应该同时运行。我使用您的示例一次运行一组线程。感谢您分享您的知识...
              • @Dantalian - 在您的 Runnable 类中(可能在 run 方法中),您可能希望捕获发生的任何异常并将它们存储在本地(或存储错误消息/条件)。在示例中,f.get() 返回您提交给 ExecutorService 的对象。您的对象可能有一个检索任何异常/错误条件的方法。根据您修改提供的示例的方式,您可能需要将 f.get() 转换的对象转换为您的预期类型。
              【解决方案12】:

              根据您的需要,您可能还想查看 java.util.concurrent 包中的 CountDownLatch 和 CyclicBarrier 类。如果您希望您的线程相互等待,或者您希望对线程的执行方式进行更细粒度的控制(例如,在它们的内部执行中等待另一个线程设置某些状态),它们会很有用。您还可以使用 CountDownLatch 来通知所有线程同时启动,而不是在遍历循环时一个接一个地启动它们。标准 API 文档有一个这样的例子,加上使用另一个 CountDownLatch 来等待所有线程完成它们的执行。

              【讨论】:

                【解决方案13】:

                完全避免使用 Thread 类,而是使用 java.util.concurrent 中提供的更高抽象

                ExecutorService 类提供的method invokeAll 似乎可以满足您的需求。

                【讨论】:

                  【解决方案14】:

                  你把所有线程放在一个数组中,全部启动,然后有一个循环

                  for(i = 0; i < threads.length; i++)
                    threads[i].join();
                  

                  每个连接都会阻塞,直到各自的线程完成。线程的完成顺序可能与您加入它们的顺序不同,但这不是问题:当循环退出时,所有线程都已完成。

                  【讨论】:

                  • @Mykola:究竟使用线程组的优势是什么?仅仅因为 API 存在并不意味着您必须使用它...
                  • 参见:“一个线程组代表一组线程。”对于这个用例,这是语义正确的!并且:“一个线程被允许访问关于它自己的线程组的信息”
                  • 《Effective Java》一书建议避免使用线程组(第 73 项)。
                  • Effective Java 中提到的 bug 应该在 Java 6 中已经修复。如果 Java 新版本不是限制,最好使用 Futures 来解决线程问题。 Martin v. Löwis:你是对的。这与该问题无关,但很高兴从一个对象(如 ExecutorService)获得有关正在运行的线程的更多信息。我认为使用给定的特性来解决问题很好;也许您将来需要更多的灵活性(线程信息)。提及旧 JDK 中的旧错误类也是正确的。
                  • ThreadGroup 没有实现组级join,所以人们为什么要推动ThreadGroup 有点莫名其妙。人们真的在使用自旋锁并查询组的 activeCount 吗?与仅在所有线程上调用 join 相比,您很难说服我这样做更好。
                  猜你喜欢
                  • 1970-01-01
                  • 1970-01-01
                  • 2019-07-18
                  • 1970-01-01
                  • 1970-01-01
                  • 2012-04-09
                  • 1970-01-01
                  • 2010-12-07
                  相关资源
                  最近更新 更多