【问题标题】:wait until all threads finish their work in java等到所有线程在java中完成它们的工作
【发布时间】:2011-12-17 20:43:39
【问题描述】:

我正在编写一个具有 5 个线程的应用程序,这些线程同时从 Web 获取一些信息并填充缓冲区类中的 5 个不同字段。
当所有线程完成工作时,我需要验证缓冲区数据并将其存储在数据库中。
我该怎么做(当所有线程完成工作时收到警报)?

【问题讨论】:

  • Thread.join 是一种相当低级的非常 Java 特有的解决问题的方法。此外,问题在于 Thread API 存在缺陷:您无法知道 join 是否成功完成(请参阅Java Concurrency In Practice)。更高级别的抽象,例如使用 CountDownLatch 可能更可取,并且对于没有“卡”在 Java-idiosynchratic 思维模式中的程序员来说看起来更自然。不要和我争论,去和Doug Lea争论; )

标签: java multithreading wait


【解决方案1】:

我有类似的情况,我必须等到所有子线程完成执行然后才能获得每个子线程的状态结果..因此我需要等到所有子线程完成。

下面是我使用多线程处理的代码

 public static void main(String[] args) {
        List<RunnerPojo> testList = ExcelObject.getTestStepsList();//.parallelStream().collect(Collectors.toList());
        int threadCount = ConfigFileReader.getInstance().readConfig().getParallelThreadCount();
        System.out.println("Thread count is : =========  " + threadCount); // 5

        ExecutorService threadExecutor = new DriverScript().threadExecutor(testList, threadCount);


        boolean isProcessCompleted = waitUntilCondition(() -> threadExecutor.isTerminated()); // Here i used waitUntil condition 

        if (isProcessCompleted) {
            testList.forEach(x -> {
                System.out.println("Test Name: " + x.getTestCaseId());
                System.out.println("Test Status : " + x.getStatus());
                System.out.println("======= Test Steps ===== ");

                x.getTestStepsList().forEach(y -> {
                    System.out.println("Step Name: " + y.getDescription());
                    System.out.println("Test caseId : " + y.getTestCaseId());
                    System.out.println("Step Status: " + y.getResult());
                    System.out.println("\n ============ ==========");
                });
            });
        }

下面的方法是用并行处理分配列表

// This method will split my list and run in a parallel process with mutliple threads
    private ExecutorService threadExecutor(List<RunnerPojo> testList, int threadSize) {
        ExecutorService exec = Executors.newFixedThreadPool(threadSize);
        testList.forEach(tests -> {
            exec.submit(() -> {
                driverScript(tests);
            });
        });
        exec.shutdown();
        return exec;
    }

这是我的等待方法:在这里你可以等到你的条件在 do while 循环中满足。就我而言,我等待了一些最大超时。 这将继续检查,直到您的 threadExecutor.isTerminated()true,轮询周期为 5 秒。

    static boolean waitUntilCondition(Supplier<Boolean> function) {
        Double timer = 0.0;
        Double maxTimeOut = 20.0;

        boolean isFound;
        do {
            isFound = function.get();
            if (isFound) {
                break;
            } else {
                try {
                    Thread.sleep(5000); // Sleeping for 5 sec (main thread will sleep for 5 sec)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                timer++;
                System.out.println("Waiting for condition to be true .. waited .." + timer * 5 + " sec.");
            }
        } while (timer < maxTimeOut + 1.0);

        return isFound;
    }

【讨论】:

    【解决方案2】:

    等待/阻塞线程主线程,直到其他线程完成它们的工作。

    正如@Ravindra babu所说,它可以通过多种方式实现,但用例子来展示。

    • java.lang.Thread.join() Since:1.0

      public static void joiningThreads() throws InterruptedException {
          Thread t1 = new Thread( new LatchTask(1, null), "T1" );
          Thread t2 = new Thread( new LatchTask(7, null), "T2" );
          Thread t3 = new Thread( new LatchTask(5, null), "T3" );
          Thread t4 = new Thread( new LatchTask(2, null), "T4" );
      
          // Start all the threads
          t1.start();
          t2.start();
          t3.start();
          t4.start();
      
          // Wait till all threads completes
          t1.join();
          t2.join();
          t3.join();
          t4.join();
      }
      
    • java.util.concurrent.CountDownLatch 自:1.5

      • .countDown() « 减少锁存器组的计数。
      • .await() « await 方法阻塞,直到当前计数达到零。

      如果您创建了latchGroupCount = 4,那么应该调用countDown() 4 次以使计数为0。因此,await() 将释放阻塞线程。

      public static void latchThreads() throws InterruptedException {
          int latchGroupCount = 4;
          CountDownLatch latch = new CountDownLatch(latchGroupCount);
          Thread t1 = new Thread( new LatchTask(1, latch), "T1" );
          Thread t2 = new Thread( new LatchTask(7, latch), "T2" );
          Thread t3 = new Thread( new LatchTask(5, latch), "T3" );
          Thread t4 = new Thread( new LatchTask(2, latch), "T4" );
      
          t1.start();
          t2.start();
          t3.start();
          t4.start();
      
          //latch.countDown();
      
          latch.await(); // block until latchGroupCount is 0.
      }
      

    线程类LatchTask的示例代码。要测试该方法,请使用joiningThreads();latchThreads(); 来自 main 方法。

    class LatchTask extends Thread {
        CountDownLatch latch;
        int iterations = 10;
        public LatchTask(int iterations, CountDownLatch latch) {
            this.iterations = iterations;
            this.latch = latch;
        }
    
        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " : Started Task...");
    
            for (int i = 0; i < iterations; i++) {
                System.out.println(threadName + " : " + i);
                MainThread_Wait_TillWorkerThreadsComplete.sleep(1);
            }
            System.out.println(threadName + " : Completed Task");
            // countDown() « Decrements the count of the latch group.
            if(latch != null)
                latch.countDown();
        }
    }
    
    • CyclicBarriers 一种同步辅助工具,它允许一组线程相互等待以到达一个共同的障碍点。CyclicBarriers 在涉及必须偶尔相互等待的固定大小的线程方的程序中很有用。屏障被称为循环的,因为它可以在等待线程被释放后重新使用。
      CyclicBarrier barrier = new CyclicBarrier(3);
      barrier.await();
      
      例如参考这个Concurrent_ParallelNotifyies 类。

    • Executer框架:我们可以使用ExecutorService创建一个线程池,用Future跟踪异步任务的进度。

      • submit(Runnable), submit(Callable) 返回未来对象。通过使用future.get()函数,我们可以阻塞主线程直到工作线程完成它的工作。

      • invokeAll(...) - 返回一个 Future 对象列表,您可以通过这些对象获取每个 Callable 的执行结果。

    Find example 使用 Interfaces Runnable, Callable with Executor 框架。


    @另见

    【讨论】:

      【解决方案3】:

      我创建了一个小助手方法来等待几个线程完成:

      public static void waitForThreadsToFinish(Thread... threads) {
              try {
                  for (Thread thread : threads) {
                      thread.join();
                  }
              }
              catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      

      【讨论】:

        【解决方案4】:

        我采用的方法是使用ExecutorService 来管理线程池。

        ExecutorService es = Executors.newCachedThreadPool();
        for(int i=0;i<5;i++)
            es.execute(new Runnable() { /*  your task */ });
        es.shutdown();
        boolean finished = es.awaitTermination(1, TimeUnit.MINUTES);
        // all tasks have finished or the time has been reached.
        

        【讨论】:

        • @Leonid 这正是 shutdown() 所做的。
        • while(!es.awaitTermination(1, TimeUnit.MINUTES));
        • @AquariusPower 你可以告诉它等待更长时间,或者永远等待。
        • 哦..我明白了;所以我在循环中添加了一条消息,说它正在等待所有线程完成;谢谢!
        • @PeterLawrey,有必要打电话给es.shutdown();吗?如果我编写一个代码,其中我在try 块和finally 中使用es.execute(runnableObj_ZipMaking); 执行了一个线程,我调用了boolean finshed = es.awaitTermination(10, TimeUnit.MINUTES);。所以我想这应该等到所有线程完成它们的工作或发生超时(无论是第一个),我的假设是否正确?还是必须打电话给shutdown()
        【解决方案5】:

        现有的答案说可以join()每个线程。

        但是有几种方法可以获取线程数组/列表:

        • 在创建时将线程添加到列表中。
        • 使用ThreadGroup 管理线程。

        以下代码将使用ThreadGruop 方法。它首先创建一个组,然后在创建每个线程时在构造函数中指定组,稍后可以通过ThreadGroup.enumerate()获取线程数组


        代码

        SyncBlockLearn.java

        import org.testng.Assert;
        import org.testng.annotations.Test;
        
        /**
         * synchronized block - learn,
         *
         * @author eric
         * @date Apr 20, 2015 1:37:11 PM
         */
        public class SyncBlockLearn {
            private static final int TD_COUNT = 5; // thread count
            private static final int ROUND_PER_THREAD = 100; // round for each thread,
            private static final long INC_DELAY = 10; // delay of each increase,
        
            // sync block test,
            @Test
            public void syncBlockTest() throws InterruptedException {
                Counter ct = new Counter();
                ThreadGroup tg = new ThreadGroup("runner");
        
                for (int i = 0; i < TD_COUNT; i++) {
                    new Thread(tg, ct, "t-" + i).start();
                }
        
                Thread[] tArr = new Thread[TD_COUNT];
                tg.enumerate(tArr); // get threads,
        
                // wait all runner to finish,
                for (Thread t : tArr) {
                    t.join();
                }
        
                System.out.printf("\nfinal count: %d\n", ct.getCount());
                Assert.assertEquals(ct.getCount(), TD_COUNT * ROUND_PER_THREAD);
            }
        
            static class Counter implements Runnable {
                private final Object lkOn = new Object(); // the object to lock on,
                private int count = 0;
        
                @Override
                public void run() {
                    System.out.printf("[%s] begin\n", Thread.currentThread().getName());
        
                    for (int i = 0; i < ROUND_PER_THREAD; i++) {
                        synchronized (lkOn) {
                            System.out.printf("[%s] [%d] inc to: %d\n", Thread.currentThread().getName(), i, ++count);
                        }
                        try {
                            Thread.sleep(INC_DELAY); // wait a while,
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
        
                    System.out.printf("[%s] end\n", Thread.currentThread().getName());
                }
        
                public int getCount() {
                    return count;
                }
            }
        }
        

        主线程将等待组中的所有线程完成。

        【讨论】:

          【解决方案6】:

          查看各种解决方案。

          1. join() API 已在 Java 的早期版本中引入。自 JDK 1.5 发布以来,此 concurrent 软件包提供了一些不错的替代方案。

          2. ExecutorService#invokeAll()

            执行给定的任务,当一切都完成时,返回一个包含其状态和结果的 Futures 列表。

            代码示例请参考这个相关的 SE 问题:

            How to use invokeAll() to let all thread pool do their task?

          3. CountDownLatch

            一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

            CountDownLatch 使用给定的计数进行初始化。由于调用countDown() 方法,await 方法会一直阻塞,直到当前计数达到零,之后所有等待的线程都将被释放,任何后续的 await 调用都会立即返回。这是一次性现象——计数无法重置。如果您需要重置计数的版本,请考虑使用 CyclicBarrier

            CountDownLatch的用法参考这个问题

            How to wait for a thread that spawns it's own thread?

          4. ForkJoinPoolnewWorkStealingPool()Executors

          5. 遍历提交到ExecutorService后创建的所有Future对象

          【讨论】:

            【解决方案7】:

            虽然与 OP 的问题无关,但如果您对仅使用一个线程的同步(更准确地说,集合点)感兴趣,您可以使用 Exchanger

            就我而言,我需要暂停父线程,直到子线程执行某些操作,例如完成了它的初始化。 CountDownLatch 也很好用。

            【讨论】:

              【解决方案8】:

              将线程对象存储到某个集合(如 List 或 Set)中,然后在线程启动后循环遍历该集合并在线程上调用 join()

              【讨论】:

                【解决方案9】:

                您可以为此使用Threadf#join 方法。

                【讨论】:

                  【解决方案10】:

                  另一种可能性是CountDownLatch 对象,它对于简单的情况很有用:因为您事先知道线程的数量,所以您使用相关的计数对其进行初始化,并将对象的引用传递给每个线程。
                  完成其任务后,每个线程都会调用CountDownLatch.countDown(),这会减少内部计数器。主线程在启动所有其他线程后,应该执行CountDownLatch.await() 阻塞调用。一旦内部计数器达到 0,它将被释放。

                  注意,对于这个对象,InterruptedException 也可以抛出。

                  【讨论】:

                    【解决方案11】:

                    您可以join 到线程。连接阻塞,直到线程完成。

                    for (Thread thread : threads) {
                        thread.join();
                    }
                    

                    请注意,join 会抛出 InterruptedException。如果发生这种情况,您必须决定该怎么做(例如,尝试取消其他线程以防止完成不必要的工作)。

                    【讨论】:

                    • 这些线程是并行运行还是顺序运行?
                    • @James Webster:语句t.join(); 表示当前线程 阻塞,直到线程t 终止。它不影响线程t
                    • 谢谢。 =] 在大学学习并行,但这是我唯一努力学习的东西!谢天谢地,我现在不需要太多使用它,或者当我使用它时它并不太复杂,或者没有共享资源并且阻塞并不重要
                    • @4r1y4n 提供的代码是否真正并行取决于您尝试使用它做什么,并且更多地与使用连接线程聚合分布在整个集合中的数据有关。您正在加入线程,这可能意味着“加入”数据。此外,并行性并不一定意味着并发。这取决于 CPU。线程很可能是并行运行的,但计算是按照底层 CPU 确定的任何顺序进行的。
                    • 那么线程是按照列表内的顺序依次结束的吗?
                    【解决方案12】:

                    除了别人建议的Thread.join(),java 5引入了executor框架。在那里你不使用Thread 对象。相反,您将 CallableRunnable 对象提交给执行程序。有一个特殊的 executor 用于执行多个任务并无序返回它们的结果。那是ExecutorCompletionService

                    ExecutorCompletionService executor;
                    for (..) {
                        executor.submit(Executors.callable(yourRunnable));
                    }
                    

                    然后你可以反复调用take(),直到没有更多的Future&lt;?&gt;对象可以返回,这意味着它们都完成了。


                    根据您的情况,可能相关的另一件事是CyclicBarrier

                    一种同步辅助工具,它允许一组线程相互等待以达到一个共同的障碍点。 CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。屏障被称为循环的,因为它可以在等待线程被释放后重新使用。

                    【讨论】:

                    • 这已经很接近了,但我还是会做一些调整。 executor.submit 返回 Future&lt;?&gt;。我会将这些期货添加到一个列表中,然后在列表中循环调用每个未来的get
                    • 另外,您可以使用Executors 实例化构造函数,例如Executors.newCachedThreadPool(或类似名称)
                    【解决方案13】:

                    我遇到了类似的问题,最终使用了 Java 8 parallelStream。

                    requestList.parallelStream().forEach(req -> makeRequest(req));
                    

                    它超级简单易读。 在幕后,它使用默认 JVM 的 fork join 池,这意味着它将等待所有线程完成后再继续。对于我来说,这是一个很好的解决方案,因为它是我的应用程序中唯一的 parallelStream。如果您同时运行多个并行流,请阅读下面的链接。

                    更多关于并行流的信息here

                    【讨论】:

                      【解决方案14】:

                      在你的主线程中使用它:while(!executor.isTerminated()); 从执行器服务启动所有线程后放置这行代码。这只会在执行器启动的所有线程都完成后启动主线程。确保调用 executor.shutdown();在上述循环之前。

                      【讨论】:

                      • 这是主动等待,会导致CPU不断运行一个空循环。非常浪费。
                      【解决方案15】:

                      试试这个,会成功的。

                        Thread[] threads = new Thread[10];
                      
                        List<Thread> allThreads = new ArrayList<Thread>();
                      
                        for(Thread thread : threads){
                      
                              if(null != thread){
                      
                                    if(thread.isAlive()){
                      
                                          allThreads.add(thread);
                      
                                    }
                      
                              }
                      
                        }
                      
                        while(!allThreads.isEmpty()){
                      
                              Iterator<Thread> ite = allThreads.iterator();
                      
                              while(ite.hasNext()){
                      
                                    Thread thread = ite.next();
                      
                                    if(!thread.isAlive()){
                      
                                         ite.remove();
                                    }
                      
                              }
                      
                         }
                      

                      【讨论】:

                        【解决方案16】:

                        执行器服务可用于管理多个线程,包括状态和完成。见http://programmingexamples.wikidot.com/executorservice

                        【讨论】:

                          【解决方案17】:

                          你会的

                          for (Thread t : new Thread[] { th1, th2, th3, th4, th5 })
                              t.join()
                          

                          在这个 for 循环之后,您可以确定所有线程都完成了它们的工作。

                          【讨论】:

                            猜你喜欢
                            • 2011-01-31
                            • 1970-01-01
                            • 1970-01-01
                            • 1970-01-01
                            • 2015-04-17
                            • 2023-03-08
                            • 1970-01-01
                            • 1970-01-01
                            • 2014-10-05
                            相关资源
                            最近更新 更多