【问题标题】:Thread safety with ConcurrentHashMapConcurrentHashMap 的线程安全
【发布时间】:2012-08-20 09:23:36
【问题描述】:

我有以下课程。我使用 ConcurrentHashMap。我有许多线程写入地图和一个计时器,每 5 分钟将数据保存在地图中。 当我在映射中写入条目时,我设法通过使用 putIfAbsent() 来实现线程安全。但是,当我从中读取然后通过 clear() 方法删除所有条目时,我希望在读取映射内容然后删除它们的过程中没有其他线程写入映射。显然,即使使用了同步(锁定){},我的代码也不是线程安全的,b/c 在 saveEntries() 中拥有锁的线程不一定是在 log() 方法中写入我的地图的同一线程!除非我用同一个锁对象锁定 log() 中的整个代码!

我想知道是否有任何其他方法可以在不通过外部锁强制同步的情况下实现线程安全?非常感谢任何帮助。

public class Logging {

private static Logging instance;    
private static final String vendor1 = "vendor1";
private static final String vendor2 = "vendor2";    
private static long delay = 5 * 60 * 1000;

private ConcurrentMap<String, Event> vendor1Calls = new ConcurrentHashMap<String, Event>();
private ConcurrentMap<String, Event> vendor2Calls = new ConcurrentHashMap<String, Event>();

private Timer timer;    
private final Object lock = new Object();

private Logging(){
    timer = new Timer();                
    timer.schedule(new TimerTask() {
        public void run() {
            try {
                saveEntries();
            } catch (Throwable t) {
                timer.cancel();
                timer.purge();
            }
        }       
    }, 0, delay);
}

public static synchronized Logging getInstance(){     
    if (instance == null){
        instance = new Logging();
    }
    return instance;
 }

public void log(){      
    ConcurrentMap<String, Event> map;
    String key = "";        

    if (vendor1.equalsIgnoreCase(engine)){
        map = vendor1Calls;
    }else if(vendor2.equalsIgnoreCase(engine)){  
        map = vendor2Calls;
    }else{
        return;
    }       


    key = service + "." + method;
// It would be the code if I use a regular HashMap instead of ConcurrentHashMap
    /*Event event = map.get(key);       

    // Map does not contain this service.method, create an Event for the first     time.
    if(event == null){
        event = new Event();            
        map.put(key, event);

        // Map already contains this key, just adjust the numbers.
    }else{
        // Modify the object fields
    }*/
    //}

    // Make it thread-safe using CHM
    Event newEvent = new Event();
    Event existingEvent= map.putIfAbsent(key, newEvent); 

    if(existingEvent!=null && existingEvent!=newEvent){
        // Modify the object fields
}       

private void saveEntries(){

    Map<String, List<Event>> engineCalls = null;
    try {           

        engineCalls = new HashMap<String, List<Event>>();
        List<Event> events = null;

// How can I achieve therad safety here w/o applying any lock?
        //synchronized(lock){
            if(!vendor1Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor1Calls.values());
                engineCalls.put(vendor1, events);
                vendor1Calls.clear();
            }
            if(!vendor2Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor2Calls.values());
                engineCalls.put(vendor2, events);
                vendor2Calls.clear();
            }
        //}

// logICalls() saves the events in the DB.          
        DBHandle.logCalls(engineCalls);
    } catch (Throwable t) {         
    } finally {
        if(engineCalls!=null){
            engineCalls.clear();
        }                       
    }   
}       

}

【问题讨论】:

  • 您通过单击答案左侧的灰色复选标记来接受答案。如果答案不能解决您的问题,请不要接受,尽管有同侪压力。
  • 是的,我建议你去stackoverflow.com/users/1052348/bluesky?tab=questions,点击每个链接,然后点击最能解决你问题的答案旁边的复选标记。 PS您可以接受您自己对自己的问题做出的回答。
  • crossposted here,您可以单独使用 CHM 功能使其具有原子性。
  • 我知道这是一个老问题,但我不明白您为什么首先将地图用于看起来本质上像队列的东西。您可以将日志条目添加到并发队列中,然后在保存条目中查看队列的大小并从队列的前面删除那么多并写入它们。因为您只看一次大小,所以在保存时添加更多也没关系;他们将留给下次。对不起,如果我误解了你的目的。

标签: java multithreading concurrency thread-safety


【解决方案1】:

以下代码使用来自functional java 项目的persistent map。它使用更多内存,但(AFAIK :) 可以安全地由多个线程使用。 AtomicReference 中唯一的可变值,它使用 compare-and-set 进行更新。地图和事件是不可变的,因此是线程安全的。另外,我没有清除地图,而是替换了对它的引用。

import fj.F;
import fj.Ord;
import fj.data.TreeMap;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

public class Logging
{
    // Event is immutable
    private static class Event
    {
        // updates are done by creating new values
        Event update(String key)
        {
            return new Event();
        }
    }

    // Refactored code pertaining to one vendor into a separate class.
    private static class EngineLogger
    {
        public final String vendor;
        private final AtomicReference<TreeMap<String, Event>> mapRef =
                new AtomicReference<TreeMap<String, Event>>(TreeMap.<String, Event>empty(Ord.stringOrd));

        private EngineLogger(String vendor)
        {
            this.vendor = vendor;
        }

        public void log(String service, String method)
        {
            final String key = service + "." + method;
            boolean updated = false;
            while (! updated)
            {
                // get the current value of the map
                TreeMap<String, Event> currMap = mapRef.get();

                // create an updated value of the map, which is the current map plus info about the new key
                TreeMap<String, Event> newMap = currMap.update(key, new F<Event, Event>()
                {
                    @Override
                    public Event f(Event event)
                    {
                        // Modify the object fields of event, if the map contains the key
                        return event.update(key);
                    }
                    // create a new event if the map does not contain the key
                }, new Event());

                // compare-and-set the new value in .. repeat until this succeeds
                updated = mapRef.compareAndSet(currMap, newMap);
            }
        }

        public List<Event> reset()
        {
            /* replace the reference with a new map */
            TreeMap<String, Event> oldMap = mapRef.getAndSet(TreeMap.<String, Event>empty(Ord.stringOrd));

            /* use the old map to generate the list */
            return new ArrayList<Event>(oldMap.toMutableMap().values());
        }
    }

    private static Logging instance;
    private static long delay = 5 * 60 * 1000;
    private final Timer timer;

    private final EngineLogger vendor1 = new EngineLogger("vendor1");
    private final EngineLogger vendor2 = new EngineLogger("vendor2");

    private Logging()
    {
        timer = new Timer();
        timer.schedule(new TimerTask()
        {
            public void run()
            {
                try
                {
                    saveEntries();
                }
                catch (Throwable t)
                {
                    timer.cancel();
                    timer.purge();
                }
            }
        }, 0, delay);
    }

    public static synchronized Logging getInstance()
    {
        if (instance == null)
        {
            instance = new Logging();
        }
        return instance;
    }

    public void log(String engine, String service, String method)
    {
        if (vendor1.vendor.equals(engine))
        {
            vendor1.log(service, method);
        }
        else if (vendor2.vendor.equals(engine))
        {
            vendor2.log(service, method);
        }
    }

    private void saveEntries()
    {
        Map<String, List<Event>> engineCalls = new HashMap<String, List<Event>>();
        engineCalls.put(vendor1.vendor, vendor1.reset());
        engineCalls.put(vendor2.vendor, vendor2.reset());
        DBHandle.logCalls(engineCalls);
    }
}

【讨论】:

  • 虽然可能不适合我的内存使用目的 b/c,但非常有帮助。非常感谢分享。
  • @blueSky 想多了,我意识到这个解决方案的扩展性很差。 log 方法中的 CAS 将失败,即使不同的线程为不同的键修改 Events。
  • 感谢您的更新。由于内存问题,我无法使用它。知道那里有什么总是很棒:)
【解决方案2】:

我最好的建议是使用ReadWriteLock,但正如您特别声明您不想使用任何锁(顺便说一句ConcurrentHashMap 可能会在内部使用它们),您可以尝试以下操作。

为您的每张地图使用AtomicReference,并在需要记录其内容时使用getAndSet 将旧地图替换为全新的空白地图。

您现在拥有一个专属使用的地图,您可以随意迭代和清除。不幸的是,有一个小问题(使用锁将摆脱),那就是如果另一个线程正在添加到映射中,而您将它换成一个空的。您也许可以在这一点上添加一个延迟,希望等待足够长的时间让另一个线程完成它正在做的事情。也许ConcurrentHashMap 有一些内部功能,您可以使用它来等到每个人都完成它。

【讨论】:

    【解决方案3】:

    此示例的原子版本显示为in this thread(仅使用 ConcurrentMap 中的功能)。

    【讨论】:

      【解决方案4】:

      如果没有任何外部同步,您无法使用 CHM 实现此目的。返回的迭代器视图是弱一致的,这意味着 Map 的内容可以在您实际迭代它时发生变化。

      看来您需要使用Collections.synchronizedMap 才能获得您正在寻找的功能。

      编辑以使我的观点更清楚:

      要使用synchronizedMap 实现此目的,您首先必须在地图上使用synchronize,然后您可以将内容迭代或复制到另一个地图中,然后再清除。

      Map map = Collections.synchronizedMap(new HashMap());
      
      public void work(){
        Map local = new HashMap();
        synchronized(map){
           local.putAll(map);
           map.clear();
        }
        //do work on local instance 
      }
      

      而不是local 实例,正如我提到的,您可以iterate + remove 类似于@Kevin Jin 的答案。

      【讨论】:

      • syncronizedMap 的迭代器对这项工作毫无用处,因为它们在面对地图的并发更改时会分崩离析。另一方面,CHM 迭代器捕获任何后来的条目并没有什么问题,它可能会更好,因为更多的条目将被保存并从地图中删除。
      • @Marko Topolnik 我说的是原子性。我应该更清楚地表明他必须在地图本身上synchronize。这显然具有性能下降的副作用,但是如果他不需要在另一个线程迭代时更新线程,那么清除将没有其他方法可以做到这一点。然而,从他的需要来看,情况可能并非如此。
      • @JohnVint:既然 Iterator 没有给我快照,那么我是否有可能最终得到一个永无止境的循环! B/c 作为线程实际上可能会比我删除它们更快地将条目添加到地图中!那么无论如何,我是否可以在不使用锁的情况下获得快照或以某种方式使其成为原子?
      • 理论上可以,但实际上不太可能。您可以在迭代之前限制获取大小,如果计数超过您可以提前中断。话虽这么说,我不会担心无限循环
      • 感谢大家的时间和回复。非常有帮助。我会考虑的......看起来让它成为原子不是一个选项。我会选择赌。 synchronizedMap 或只是迭代。再次感谢。
      【解决方案5】:

      但是,当我从中读取然后通过 clear() 删除所有条目时 方法,我不希望其他线程写入映射,而我在这个过程中 读取地图内容然后删除它们。

      我认为您想说的是您并不真正关心严格锁定地图。相反,您只真正关心 vender1Calls.values() 和 vendor1Calls.clear() 之间的任何日志条目的丢失,对吗?

      既然如此,我可以想象你可以替换

      events.addAll(vendor1Calls.values());
      vendor1Calls.clear();
      

      在 saveEntries 中有这个:

      for (Iterator<Event> iter = vendor1Calls.values().iterator(); iter.hasNext(); ) {
          Event e = iter.next();
          events.add(e);
          iter.remove();
      }
      

      这样,您只需删除已添加到事件列表中的事件。您仍然可以在 saveEntries() 仍在执行时写入 vendor1Calls 映射,但迭代器会跳过添加的值。

      【讨论】:

      • 它实际上可能会迭代同时添加的值,但不能保证。
      • 感谢您的回复。使用 Iterator 如何使其线程安全?我如何确保不是 2 个线程同时通过映射的 values() 进行迭代,同时一个执行:事件 e = iter.next();另一个线程没有执行:iter.remove(); ?此外,我的目标是锁定整个地图,而我正在从地图读取过程中没有其他线程写入它,b/c 当我在迭代过程中添加到地图的条目会发生什么通过他们?
      • 我假设由于您使用计时器调用 saveEntries(),saveEntries() 是私有的,并且您没有其他代码,因此您不会有多个线程迭代vendor1Calls 或 vendor2Calls 并可能对它们调用 remove。
      • 在迭代时写入的那些可能会或可能不会被保存(“迭代器和枚举返回的元素反映了在迭代器/枚举创建时或之后的某个时刻哈希表的状态。 "),但有​​一点是肯定的:绝对不会丢失任何条目,因为在 saveEntries() 中调用 iter.remove() 删除的条目已经添加到事件 ArrayList 中。
      • 请记住,iter.remove() 会删除由 iter.next() 返回的元素。不要指望在迭代器中当前位置所占据的位置添加某些内容,并且 iter.remove() 会错误地删除它。
      猜你喜欢
      • 2014-03-04
      • 2019-09-22
      • 2011-04-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-07-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多