【问题标题】:How to use preexisting runnables, limiting the number of runnables to create.?如何使用预先存在的可运行对象,限制要创建的可运行对象的数量。?
【发布时间】:2014-10-20 16:15:42
【问题描述】:

问题陈述:

  1. 我有 5000 个 id 指向数据库中的行。[可能超过 5000]

  2. 每个 Runnable 检索数据库中给定 id 的行并执行一些耗时的任务

    public class BORunnable implements Callable<Properties>{
    
      public BORunnable(String branchID) {            
        this.branchID=branchID;            
      }
    
      public setBranchId(String branchID){
        this.branchID=branchID;
      }
      public Properties call(){
        //Get the branchID
        //Do some time consuming tasks. Merely takes 1 sec to complete
    
        return propObj;            
      }
    }
    
  3. 我将把这些可运行文件提交给执行器服务。

  4. 为此,我需要创建 5000 个甚至更多的可运行文件并将其提交给执行器服务。在我的环境中创建可运行文件可能会引发内存不足异常。 [鉴于5000只是一个例子]

所以我想出了一个方法,如果您提供任何不同的方法,我将不胜感激:

  1. 创建了一个固定大小为 10 的线程池。

    int corePoolSize = 10;
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, 
        corePoolSize + 5, 10, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>());
    
    Collection<Future<Properties>> futuresCollection = 
        new LinkedList<Future<Properties>>();
    
  2. 将所有的 branchID 添加到 branchIdQueue

    Queue<String> branchIdQueue = new LinkedList<String>();
    Collections.addAll(branchIdQueue, branchIDs);
    
  3. 我正在尝试重用 runnable。创建了一堆runnable

    现在我希望这个数量的元素被出列并为每个元素创建可运行的

    int noOfElementsToDequeue = Math.min(corePoolSize, branchIdQueue.size());
    
    ArrayList<BORunnable>runnablesList = dequeueAndSubmitRunnable(
        branchIdQueue,noOfElementsToDequeue);
    
    
    ArrayList<BORunnable> dequeueAndSubmitRunnable(branchIdQueue,
        noOFElementsToDequeue){
    ArrayList<BORunnable> runnablesList= new ArrayList<BORunnable>();
        for (int i = 0; i < noOfElementsToDequeue; i++) {
            //Create this number of runnables
            runnablesList.add(new BORunnable(branchIdQueue.remove()));
        }
    return runnablesList;
    }
    
  4. 将检索到的runnables提交给执行器

    for(BORunnable boRunnableObj:runnablesList){
         futuresCollection.add(executor.submit(boRunnableObj));
    }
    
  5. 如果队列为空,我创建了所需的可运行文件。如果不是,我想重用runnable并提交给executor。

  6. 在这里,我得到要重用的可运行的数量 = 总计数 - 当前活动计数 [对我来说大概就足够了]

    int coreSize=executor.getCorePoolSize();
    
    while(!branchIdQueue.isEmpty()){
    
        //Total size - current active count 
        int runnablesToBeReused=coreSize-executor.getActiveCount();
        if(runnablesToBeReused!=0){
            ArrayList<String> branchIDsTobeReset = removeElementsFromQueue(
                branchIdQueue,runnablesToBeReused);
            ArrayList<BORunnable> boRunnableToBeReusedList = 
                getBORunnableToBeReused(boRunnableList,runnablesToBeReused);
            for(BORunnable aRunnable:boRunnableList){
                //aRunnable.set(branchIDSTobeRest.get(0));
            }
        }
    
    }
    

我的问题是

  1. 我无法找出线程池释放了哪个 Runnable,因此我可以使用它来提交

  2. 因此,我随机取了几个 runnables 并尝试设置 branchId,但随后可能会出现线程竞争问题。 [不想使用 volatile]

【问题讨论】:

  • 为什么不直接使用branchidQueue,提交的时候直接创建BORunnable呢?我不明白你为什么需要runnablesList。所以你会保存那段记忆。
  • 试图避免 BORunnable 中的迭代。如果可运行对象中有迭代,那么多线程概念就没有用处了。
  • 为什么不使用 Future.isDone()? docs.oracle.com/javase/7/docs/api/java/util/concurrent/…
  • No 不是 inside 可运行文件。我的意思是 for(String id:branchidQueue){ futuresCollection.add(executor.submit(new BORunnable(id))); } 好吧,只是用 dequeing,但你知道我的意思。只是省略了 ArrayList - 步骤。
  • @Fildor 创建 BoRunnable 可能会导致 JVM 环境内存不足异常。 BoRunnable 对象创建的数量没有指定限制。

标签: java multithreading threadpool runnable executorservice


【解决方案1】:

重用Runnables 没有意义,因为问题不在于创建或释放可运行实例的成本。这些在 Java 中几乎是免费的。

您想要做的是限制很容易实现的待处理作业的数量:只需为您传递给执行程序服务的队列提供一个限制 .这就像将int 值(限制)传递给LinkedBlockingQueue’s constructor 一样简单。请注意,您也可以使用ArrayBlockingQueue,因为LinkedBlockingQueue 不提供有界队列使用的优势。

当您为队列提供限制时,执行程序将拒绝排队新作业。唯一要做的就是为执行者提供一个合适的RejectedExecutionHandler。例如。 CallerRunsPolicy 足以避免调用者在线程都忙且队列已满时创建更多新作业。

执行后,Runnables 会被垃圾回收。

【讨论】:

  • 感谢您的意见。如果您为队列提供限制,它将拒绝我尝试添加的所有其他可运行项,我不希望这种情况发生。这是我的问题,仅当队列足够空以容纳一个或多个可运行对象时才创建和添加可运行对象。
  • @surendhar_s:也许您应该(重新)阅读带有RejectedExecutionHandlerCallerRunsPolicy 的部分。
  • 如果你不喜欢CallerRunsPolicy,你可以看看WaitingRejectionHandlerfrom this answer。两者的共同点是所有可运行对象运行,它只是在创建新的可运行对象时减慢提交线程。这正是你想要的。
猜你喜欢
  • 2012-05-20
  • 2013-08-07
  • 2012-02-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多