【问题标题】:Java concurrency counter not properly clean upJava 并发计数器未正确清理
【发布时间】:2015-07-08 17:54:29
【问题描述】:

这是一个java并发问题。需要完成 10 个工作,每个工作将有 32 个工作线程。工作线程会增加一个计数器。一旦计数器为 32,则表示此工作已完成,然后清理计数器图。从控制台输出,我预计将输出 10 个“完成”,池大小为 0,counterThread 大小为 0。

问题是:

  1. 大多数时候,“池大小:0 和 countThreadMap 大小:3”将是 打印出来。甚至那些所有线程都消失了,但 3 个工作没有 完成了。

  2. 有一段时间,我在第27行看到nullpointerexception。我用过ConcurrentHashMap和AtomicLong,为什么还有并发 例外。

谢谢

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

public class Test {
    final ConcurrentHashMap<Long, AtomicLong[]> countThreadMap = new ConcurrentHashMap<Long, AtomicLong[]>();
    final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    final ThreadPoolExecutor tPoolExecutor = ((ThreadPoolExecutor) cachedThreadPool);

    public void doJob(final Long batchIterationTime) {
        for (int i = 0; i < 32; i++) {
            Thread workerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (countThreadMap.get(batchIterationTime) == null) {
                        AtomicLong[] atomicThreadCountArr = new AtomicLong[2];
                        atomicThreadCountArr[0] = new AtomicLong(1);
                        atomicThreadCountArr[1] = new AtomicLong(System.currentTimeMillis()); //start up time
                        countThreadMap.put(batchIterationTime, atomicThreadCountArr);
                    } else {
                        AtomicLong[] atomicThreadCountArr = countThreadMap.get(batchIterationTime);
                        atomicThreadCountArr[0].getAndAdd(1);
                        countThreadMap.put(batchIterationTime, atomicThreadCountArr);
                    }

                    if (countThreadMap.get(batchIterationTime)[0].get() == 32) {
                        System.out.println("done");
                        countThreadMap.remove(batchIterationTime);
                    }
                }
            });
            tPoolExecutor.execute(workerThread);
        }
    }

    public void report(){
        while(tPoolExecutor.getActiveCount() != 0){
            //
        }
        System.out.println("pool size: "+ tPoolExecutor.getActiveCount() + " and countThreadMap size:"+countThreadMap.size());
    }

    public static void main(String[] args) throws Exception {
        Test test = new Test();
        for (int i = 0; i < 10; i++) {
            Long batchIterationTime = System.currentTimeMillis();
            test.doJob(batchIterationTime);
        }

        test.report();
        System.out.println("All Jobs are done");

    }
}

【问题讨论】:

  • 你知道report() 方法不是原子的,对吧?不同尺寸的检查之间存在竞争。
  • 你的意思是这条线不是线程安全的吗? tPoolExecutor.getActiveCount()
  • report() 中的 while 循环...这真的是一个紧密的循环,还是// 所在的位置有实际代码?如果这是一个紧密的循环 - 这不是您等待线程完成的方式。
  • 另外,仅仅因为您使用ConcurrentHashMap 并不意味着您正在以线程安全的方式使用它。您没有正确同步访问(查找 check-then-act)。这也是一个相当抽象的问题,可能是XY problem
  • @EdwardChen putget 是线程安全的,但调用 get 然后再调用 put 期望地图的状态在此期间保持不变是错误的。只需查看 check-then-act,此代码已严重损坏。如果您能描述(使用文字,而不是代码)您的目标是什么,那将会很有帮助。假设您上面的代码是完美的并且可以正常工作,您将如何处理它?在我看来,那个是重要的问题。

标签: java multithreading concurrency


【解决方案1】:

让我们深入挖掘线程相关编程的所有错误,一个人会犯:

Thread workerThread = new Thread(new Runnable() {
…
tPoolExecutor.execute(workerThread);

您创建了一个Thread,但不启动它,而是将其提交给执行者。无缘无故让Thread 实现Runnable 是Java API 的一个历史错误。现在,每个开发人员都应该意识到,没有理由将Thread 视为Runnable。如果您不想手动start 线程,请不要创建Thread。只需创建Runnable 并将其传递给executesubmit

我想强调后者,因为它返回一个Future,它免费为您提供您尝试实现的内容:任务完成时的信息。使用invokeAll 会更容易,它将提交一堆Callables 并在所有完成后返回。由于您没有告诉我们有关您的实际任务的任何信息,因此尚不清楚您是否可以让您的任务简单地实现Callable(可能返回null)而不是Runnable

如果您不能使用Callables 或者不想在提交时立即等待,则必须记住返回的Futures 并稍后查询:

static final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

public static List<Future<?>> doJob(final Long batchIterationTime) {
    final Random r=new Random();
    List<Future<?>> list=new ArrayList<>(32);
    for (int i = 0; i < 32; i++) {
        Runnable job=new Runnable() {
            public void run() {
                // pretend to do something
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(r.nextInt(10)));
            }
        };
        list.add(cachedThreadPool.submit(job));
    }
    return list;
}

public static void main(String[] args) throws Exception {
    Test test = new Test();
    Map<Long,List<Future<?>>> map=new HashMap<>();
    for (int i = 0; i < 10; i++) {
        Long batchIterationTime = System.currentTimeMillis();
        while(map.containsKey(batchIterationTime))
            batchIterationTime++;
        map.put(batchIterationTime,doJob(batchIterationTime));
    }
    // print some statistics, if you really need
    int overAllDone=0, overallPending=0;
    for(Map.Entry<Long,List<Future<?>>> e: map.entrySet()) {
        int done=0, pending=0;
        for(Future<?> f: e.getValue()) {
            if(f.isDone()) done++;
            else  pending++;
        }
        System.out.println(e.getKey()+"\t"+done+" done, "+pending+" pending");
        overAllDone+=done;
        overallPending+=pending;
    }
    System.out.println("Total\t"+overAllDone+" done, "+overallPending+" pending");
    // wait for the completion of all jobs
    for(List<Future<?>> l: map.values())
        for(Future<?> f: l)
            f.get();
    System.out.println("All Jobs are done");
}

但请注意,如果后续任务不需要ExecutorService,则等待所有作业完成会容易得多:

cachedThreadPool.shutdown();
cachedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
System.out.println("All Jobs are done");

但是不管手动跟踪作业状态有多么不必要,让我们深入研究一下,这样你将来可能会避免错误:

if (countThreadMap.get(batchIterationTime) == null) {

ConcurrentMap 是线程安全的,但这不会将您的并发代码变成顺序代码(这会使多线程变得无用)。上一行可能被多达 32 个线程同时处理,所有线程都发现 key 还不存在,因此可能会有多个线程将初始值放入映射中。

                    AtomicLong[] atomicThreadCountArr = new AtomicLong[2];
                    atomicThreadCountArr[0] = new AtomicLong(1);
                    atomicThreadCountArr[1] = new AtomicLong(System.currentTimeMillis());
                    countThreadMap.put(batchIterationTime, atomicThreadCountArr);

这就是为什么这被称为“check-then-act”反模式。如果不止一个线程要处理该代码,他们都会put他们的新值,确信这是正确的事情,因为他们在行动之前已经检查了初始条件,但是对于除了一个线程之外的所有线程,条件已经改变正在执行,他们正在覆盖之前的 put 操作的值。

                } else {
                    AtomicLong[] atomicThreadCountArr = countThreadMap.get(batchIterationTime);
                    atomicThreadCountArr[0].getAndAdd(1);
                    countThreadMap.put(batchIterationTime, atomicThreadCountArr);

由于您正在修改已存储到映射中的AtomicInteger,因此put 操作是无用的,它会放入它之前检索到的数组。如果没有上面描述的可以有多个初始值的错误,那么put操作是没有效果的。

                }

                if (countThreadMap.get(batchIterationTime)[0].get() == 32) {

同样,ConcurrentMap 的使用不会将多线程代码转换为顺序代码。虽然很明显唯一的最后一个线程会将原子整数更新为32(当初始竞争条件没有实现时),但不能保证所有其他线程都已经通过了这个if 语句。因此不止一个,最多所有线程仍然可以在这个执行点看到32 的值。或者……

                    System.out.println("done");
                    countThreadMap.remove(batchIterationTime);

看到32 值的线程之一可能会执行此remove 操作。此时,可能仍有线程没有执行上述if 语句,现在看不到32 的值,但生成了NullPointerException,因为应该包含AtomicInteger 的数组不再在映射中。偶尔会发生这种情况……

【讨论】:

    【解决方案2】:

    创建 10 个作业后,您的 main 线程仍在运行 - 它不会等待您的作业完成,然后在 test 上调用 report。您尝试使用while 循环来克服这个问题,但是tPoolExecutor.getActiveCount() 在执行workerThread 之前可能会以0 出现,然后在将线程添加到您的HashMap 之后会发生countThreadMap.size() .

    有很多方法可以解决这个问题 - 但我会让另一个回答者这样做,因为我现在必须离开。

    【讨论】:

    • 这是有道理的。第二个问题呢?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-03
    • 1970-01-01
    • 2020-10-27
    • 1970-01-01
    相关资源
    最近更新 更多