【问题标题】:How to lazily create tasks for use by a Java thread-pool如何延迟创建供 Java 线程池使用的任务
【发布时间】:2012-03-24 08:39:25
【问题描述】:

我正在用 Java 编写一个负载测试应用程序,并且有一个线程池可以针对被测服务器执行任务。因此,要制作 1000 个作业并在 5 个线程中运行它们,我会执行以下操作:

    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<Runnable> jobs = makeJobs(1000);
    for(Runnable job : jobs){
        pool.execute(job);
    }

但是我认为这种方法不会很好地扩展,因为我必须提前制作所有“工作”对象并将它们放在内存中,直到需要它们为止。

我正在寻找一种方法,让池中的线程在每次需要新工作时转到某种“JobFactory”类,并让工厂根据请求构建 Runnables,直到所需数量的工作完成被运行。工厂可能会开始返回 'null' 以向线程发出没有更多工作要做的信号。

我可以手动编写这样的代码,但它似乎是一个足够常见的用例,我想知道在美妙但复杂的“java.util.concurrent”包中是否有任何东西可以用来代替?

【问题讨论】:

    标签: java multithreading lazy-loading threadpool java.util.concurrent


    【解决方案1】:

    嗯。您可以创建一个具有固定容量的BlockingQueue&lt;Runnable&gt;,并让每个工作线程出列Runnable 并运行它。然后你可以有一个生产者线程,它将作业放入队列。

    主线程会做类似的事情:

    // 100 is the capacity of the queue before blocking
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(100);
    // start the submitter thread
    new Thread(new JobSubmitterThread(queue)).start();
    // make in a loop or something?
    new Thread(new WorkerThread(queue)).start();
    new Thread(new WorkerThread(queue)).start();
    ...
    

    工人看起来像:

    public class WorkerThread implements Runnable {
         private final BlockingQueue<Runnable> queue;
         public WorkerThread(BlockingQueue<Runnable> queue) {
             this.queue = queue;
         }
         public void run() {
             // run until the main thread shuts it down using volatile boolean or ...
             while (!shutdown) {
                 Runnable job = queue.take();
                 job.run();
             }
         }
    }
    

    作业提交者看起来像:

     public class JobSubmitterThread implements Runnable {
         private final BlockingQueue<Runnable> queue;
         public WorkerThread(BlockingQueue<Runnable> queue) {
             this.queue = queue;
         }
         public void run() {
             for (int jobC = 0; jobC < 1000; jobC++) {
                 Runnable job = makeJob();
                 // this would block when the queue reaches capacity
                 queue.put(job);
             }
         }
     }
    

    【讨论】:

      【解决方案2】:

      您可以使用 AtomicInteger 在线程池的执行线程中完成所有工作,以监视执行的可运行对象的数量

       int numberOfParties = 5;
       AtomicInteger numberOfJobsToExecute = new AtomicInteger(1000);
       ExecutorService pool = Executors.newFixedThreadPool(numberOfParties );
       for(int i =0; i < numberOfParties; i++){
           pool.submit(new Runnable(){
              public void run(){
                  while(numberOfJobsToExecute.decrementAndGet() >= 0){
                      makeJobs(1).get(0).run();
                  }
              }
           });
       }
      

      您还可以将返回的 Future 存储在 List 中,并将 get() 存储在它们上以等待完成(以及其他机制)

      【讨论】:

      • 好答案@John。 +1。当然比我的回答简单。我认为你想要 >= 0 否则它将做 999 个工作,因为递减首先发生。 makeJobs 显然必须可重入才能执行此操作。
      猜你喜欢
      • 1970-01-01
      • 2015-03-28
      • 2011-03-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-06
      • 2019-08-02
      • 2015-05-19
      相关资源
      最近更新 更多