【问题标题】:Multithread ConcurrentModificationException多线程并发修改异常
【发布时间】:2014-04-11 15:58:03
【问题描述】:

我已经在网上搜索了一段时间试图解决这个问题,但没有成功。

在我的应用程序中,我尝试使用基本交换加密方案对大量消息进行加密。由于这些集合是大量的 BigInteger,我正在尝试对加密进行多线程处理以提高性能。

基本上,我获取大量消息并将其拆分为子集,然后传递给加密线程以执行加密子集。然后我尝试提取每个子集,并在线程全部完成其部分后将它们聚合到原始的大集合中。 当我遍历线程并提取它们的每个加密时,当我尝试将所有加密实际添加到所有加密列表时发生错误,它抛出的错误是 java.util.ConcurrentModificationException 错误。

我尝试使用同步,但没有帮助。

这里是函数调用:

protected Set<BigInteger> multiEncrypt(BigInteger key, HashSet<BigInteger> messageSet) {
    ArrayList<BigInteger> messages = new ArrayList<BigInteger>(messageSet);
    Set<BigInteger> encryptions = Collections.synchronizedSet(new HashSet<BigInteger>());
    int cores = Runtime.getRuntime().availableProcessors();
    int numMessages = messages.size();
    int stride = numMessages/cores;

    //create all the threads and run them
    ArrayList<EncryptThread> threads = new ArrayList<EncryptThread>();
    for (int thread = 0; thread < cores; thread++) {
        int start = thread*stride;
        //don't want to go over the end
        int stop = ((thread+1)*stride >= messages.size()) ? messages.size()-1 : (thread+1)*stride;
        List<BigInteger> subList = messages.subList(start, stop);
        EncryptThread t = new EncryptThread(encryptionScheme.getPrime(), key, subList);
        t.start();
        threads.add(t);
    }
    //pull out the encryptions
    synchronized(encryptions){
        for (int i=0; i < threads.size()-1; i++) {
            EncryptThread thread = threads.get(i);
            ArrayList<BigInteger> these = thread.getEncryptions();
            encryptions.addAll(these); //<-- Erroring Here
            thread.finish();
        }
    }

以下是我为进行加密而编写的 EncryptThread 类的相关部分:

/**
 * Constructor
 */
public EncryptThread(BigInteger prime, BigInteger key, List<BigInteger> messages) {
    //need a new encryption scheme object for each thread
    encryptionScheme = new EncryptionScheme(prime);
    encryptions = new ArrayList<BigInteger>();
    this.key = key;
    this.messages = messages;
    wait = true;
}

@Override
public void run() {
    encryptMessages(key, messages);
    while(wait);
}

/**
 * Used to encrypt a set of messages
 * @param key
 * @param messages
 * @return
 */
public void encryptMessages(BigInteger key, List<BigInteger> messages) {
    System.out.println("Encrypting stuff");
    for (BigInteger m : messages) {
        BigInteger em = encryptionScheme.encrypt(key, m);
        encryptions.add(m);
    }
}

public ArrayList<BigInteger> getEncryptions() {
    return encryptions;
}

    //call this after encryptions have been pulled to let the thread finish
public void finish() {
    wait = false;
}

}

我对 Java 并不陌生,但我对 Java 中的多线程并不陌生,因此我将不胜感激任何建议。提前致谢!

编辑:根据建议,我在 EncryptThread 类中添加了一个简单的锁定机制,这使得线程等待返回加密,直到它们全部完成并且它现在可以工作了。

public void encryptMessages(BigInteger key, List<BigInteger> messages) {
    System.out.println("Encrypting stuff");
    this.lock = true;
    for (BigInteger m : messages) {
        BigInteger em = encryptionScheme.encrypt(key, m);
        //deals with when we have to mark chaff at S2
        if (shift) {
            em.shiftLeft(1);
            if(shiftVal != 0) em.add(BigInteger.ONE);
        }
        encryptions.add(m);
    }
    this.lock = false;
}

public ArrayList<BigInteger> getEncryptions() {
    while(lock);
    return encryptions;
}

EDIT #2 所以我最终使用了我实验室的某个人向我建议的解决方案。我摆脱了锁定和等待布尔值,以及 EncryptThread 类中的 finish() 函数,而是在 start 和 getEncryption 循环之间添加了一个简单的 thread.join() 循环:

    //create all the threads
    ArrayList<EncryptThread> threads = new ArrayList<EncryptThread>();
    for (int thread = 0; thread < cores; thread++) {
        int start = thread*stride;
        //don't want to go over the end
        int stop = ((thread+1)*stride >= messages.size()) ? messages.size()-1 : (thread+1)*stride;
        List<BigInteger> subList = messages.subList(start, stop);
        EncryptThread t = new EncryptThread(encryptionScheme.getPrime(), key, subList, shiftVal);
        t.start();
        threads.add(t);

    }
    //wait for them to finish
    for( EncryptThread thread: threads) {
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //pull out the encryptions
    for (int i=0; i < threads.size()-1; i++) {
        EncryptThread thread = threads.get(i);
        encryptions.addAll(thread.getEncryptions());
    }

我认为我的主要困惑是我认为线程类在完成运行后不能调用它的方法。但上述工作正常。

【问题讨论】:

  • 你的 //pull out 加密应该在所有线程完成他们的进程后运行。是这样吗?如果任何一个线程正在将消息添加到它自己的数组列表中,并且您尝试在此处添加相同的列表。这导致了这个问题。所有线程完成后运行聚合部分。
  • 啊,明白了。我确保线程在 getEncryptions 方法返回之前完成加密并修复它,谢谢!
  • @user2666216 您当前的轮询解决方案可能不是最好的。您可能希望使用 ExecutorService 将线程转换为任务并等待它们完成。这段代码可能仍然不是线程安全的。
  • @user2666216 我同意 NESPowerGlove 。使用 ExecutorService。我也确实在我的回答中提到过
  • 虽然 ExecutorService 似乎是一个合理的解决方案,但我最终得到的解决方案对我来说似乎更直观。不过感谢您的建议,我真的很感激。

标签: java multithreading concurrentmodification


【解决方案1】:

ConcurrentModificationException 发生在您在迭代集合时修改集合。它与多线程关系不大,因为您可以轻松创建单线程示例:

ArrayList<String> myStrings = new ArrayList<>();
myStrings.add("foo");
myStrings.add("bar");
for(String s : myStrings) {
   myStrings.add("Hello ConcurrentModificationException!");

【讨论】:

    【解决方案2】:

    如果您查看 List 的 addAll、it says the following 上的文档:

    将指定集合中的所有元素附加到此列表的末尾,按照指定集合的​​迭代器返回的顺序(可选操作)。 如果在操作过程中修改了指定的集合,则该操作的行为是不确定的。(注意,如果指定的集合是这个列表,并且它是非空的,就会发生这种情况。)

    addAll 在您的encryptMessages 方法中使用它的迭代器时,您可以看到您的列表正在被修改,您生成的线程之一当前正在执行。

    for (BigInteger m : messages) {
        BigInteger em = encryptionScheme.encrypt(key, m);
        encryptions.add(m); // <-- here
    }
    

    我没有完整查看您的所有代码,但这里的一些东西不是线程安全的。您可能会使用CopyOnWriteArrayList 而不是常规的ArrayList 来避免ConcurrentModificationException 做得很好,如果您可以在 addAll 调用中不将所有内容添加到列表中,如果您没有,您也可以然后需要等待线程完成。您可能只想使用带有 ExecutorService 的任务。可能还有其他改进。

    此外,每个人都提到的学习如何用 Java 编写线程安全程序的 goto 书是 Concurrency in Practice,如果您不熟悉 Java 并发,我建议您阅读。

    【讨论】:

      【解决方案3】:

      你在这里开始你的话题。

      for (int thread = 0; thread < cores; thread++) {
              int start = thread*stride;
              //don't want to go over the end
              int stop = ((thread+1)*stride >= messages.size()) ? messages.size()-1 : (thread+1)*stride;
              List<BigInteger> subList = messages.subList(start, stop);
              EncryptThread t = new EncryptThread(encryptionScheme.getPrime(), key, subList);
              t.start();
              threads.add(t);
          }
      

      嗯。然后你必须等待所有线程完成,然后在这个块中开始聚合。

      //pull out the encryptions
      synchronized(encryptions){
          for (int i=0; i < threads.size()-1; i++) {
              EncryptThread thread = threads.get(i);
              ArrayList<BigInteger> these = thread.getEncryptions();
              encryptions.addAll(these); //<-- Erroring Here
              thread.finish();
          }
      }
      

      您正在阻塞仅访问encryptions 的线程。但是您创建的线程没有访问 set 。同时它将继续添加到自己的数组 List these 。因此,当您调用 encryptions.addAll(these); 时,它们会被两个线程访问(拥有encryptions 的线程和拥有these 的线程

      其他答案提供了有关 addAll 中并发异常的原因的详细信息。

      您必须等到所有线程完成它们的工作。

      你可以使用ExecutorService来做到这一点

      将你的起始线程更改为

      ExecutorService es = Executors.newFixedThreadPool(cores);
      for(int i=0;i<5;i++)
          es.execute(new Runnable() { /*  your task */ }); //EncryptThread instance
      es.shutdown();
      boolean finshed = es.awaitTermination(1, TimeUnit.MINUTES);
      

      然后处理您的添加回过程。

      ExecutorService es = Executors.newFixedThreadPool(cores);
      for (int thread = 0; thread < cores; thread++) {
              int start = thread*stride;
              //don't want to go over the end
              int stop = ((thread+1)*stride >= messages.size()) ? messages.size()-1 : (thread+1)*stride;
              List<BigInteger> subList = messages.subList(start, stop);
              EncryptThread t = new EncryptThread(encryptionScheme.getPrime(), key, subList);
              es.execute(t);
              threads.add(t);
          }
      es.shutdown();
      boolean finshed = es.awaitTermination(1, TimeUnit.MINUTES);
      
      
      
      //pull out the encryptions
          synchronized(encryptions){
              for (int i=0; i < threads.size()-1; i++) {
                  EncryptThread thread = threads.get(i);
                  ArrayList<BigInteger> these = thread.getEncryptions();
                  encryptions.addAll(these); //<-- Erroring Here
                  thread.finish();
              }
          }
      

      假设,您的 EncryptThread 现在是 Thread。您可能需要更改为实现 Runnable。并且 getEncryptions 没有其他变化

      【讨论】:

      • 好的,我有点遵循这个,但是循环从 i = 0 到 4 的第一块代码的目的是什么?
      • 你不需要那个。我只是展示了一般如何做。后来我改变了相同的以满足您的需求
      猜你喜欢
      • 2016-11-19
      • 2012-09-26
      • 2013-03-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多