【问题标题】:ThreadPoolExecutor not assigning tasks to worker threads [closed]ThreadPoolExecutor 没有将任务分配给工作线程[关闭]
【发布时间】:2017-08-03 13:16:44
【问题描述】:

我的 ThreadPoolExecutor 没有将任务分配给工作线程。由于我不知道这个问题是如何产生的,所以将解释当前情况。

CorePoolSize = 15, 
Max PoolSize = 100, 
KeepAliveTime 5 minutes, 
ArrayBlockingQueue of size 1.

目前

executor.getActiveCount() = 0, 
executor.getPoolSize() = 100, 
executor.getQueue().remainingCapacity() = 0

我可以看到我提交的任务在 ArrayBlockingQueue 中,但是为什么这 100 个线程只是坐在线程池中而没有人选择提交的任务。

在 jconsole 中具有相似堆栈跟踪的所有线程

Name: pool-15-thread-182
State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@16926fe3
Total blocked: 396  Total waited: 66

Stack trace: 
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2033)
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:347)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:955)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:917)
java.lang.Thread.run(Thread.java:682)    

代码

初始化:

executor = new ThreadPoolExecutor(CORE_THREADPOOL_SIZE, (int)MAX_THREADPOOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1), new ThreadPoolExecutor.AbortPolicy());

提交任务:

Future<?> futureTask = executor.submit(opmTask); 

任务:

 class OpmTask implements Runnable { 
@Override public void run() {
 .... }
 } 

线程管理杀死阻塞线程

if(future != null && !future.isDone()){
                        future.cancel(true);
                    }
// after 30 minutes of above code if still blocking thread running
if(thread.isAlive()){
                        thread.stop();
                    }

在一些内部方法中运行

while (true) {
                while (InputStream.available() > 0) {
                    int i = in.read(tmp, 0, 1024);
                    if (i < 0)
                        break;
                    outputStr += new String(tmp, 0, i);
                }

                if(Thread.interrupted()){
                    logger.log(Level.SEVERE,"Thread interrupted");
                    throw new BlockedThreadException("Thread interrupted");
                }
}

【问题讨论】:

  • 你能附上你的代码吗?不看就很难调试
  • 张贴。所以我们可以阅读。谢谢!!
  • 提取任务需要一些时间,因此您可能会暂时看到 N 个队列。休眠线程可能需要 10-100 微秒才能唤醒并获取任务。
  • 您能先尝试增加队列的容量吗?
  • 我认为您需要创建一个简短的可重复测试。很可能您做错了什么,但无法从所提供的信息中确定它是什么。

标签: java multithreading java.util.concurrent threadpoolexecutor


【解决方案1】:

从您的示例中,这有点难以理解。我建议首先让你的线程成为守护进程,这样它们就不会阻止你的应用程序关闭。至于你的例子,我看不到任何不当行为。试试这个例子:

public class ExecutorTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, (int)10, 5 * 60 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1), new ThreadFactoryBuilder().setDaemon(true).build());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        while(true) {
            executor.submit(() -> {
                System.out.println("Executing " + UUID.randomUUID().toString());
            });
            Thread.sleep(1000);
        }

    }

}

请注意,这将永远运行并逐行打印下一行。也不是说我传入了一个 ThreadFactory,它将执行程序的线程配置为守护进程。

至于你的堆栈跟踪:

Stack trace: 
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:322)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:957)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:917)
java.lang.Thread.run(Thread.java:682)

这并不意味着您的线程被卡住了。 (嗯,从技术上讲确实如此,但实际上并非如此)。他们正在等待一项新的工作。如果你看看你的队列实现:ArrayBlockingQueue#take

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

这是他们等待新任务的地方。他们检查队列是否有其他事情要做,然后等到它做。这自然会发生在您的所有线程中。

我也觉得配置有点奇怪。您使用最大大小为 1 的队列配置 100 个工作人员。这似乎是丢弃任务的秘诀。

另外,你没有展示你的任务实现,这意味着我只是在猜测你的线程池在异常时的行为方式。

您的设置似乎工作正常,但问题在于任务或提交本身。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-12-18
    • 2012-09-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多