【问题标题】:Locking on map value锁定地图值
【发布时间】:2021-06-12 21:52:48
【问题描述】:

我想通过锁定映射(用户 ID 到锁定映射)来同步对范围内资源(例如用户特定资源)的访问。如果不需要,我也不想将用户特定的锁保留在内存中。我的“main”方法创建了 1000 个线程并使用相同的键调用此方法来模拟并发访问,但在第一次断言时失败。仅当我取消注释 synchronized (SERVICE_CLASS_LOCK) 行时,代码才会通过,但出于此问题范围之外的原因,我不想这样做。有什么想法吗?

import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.testng.Assert;

public class Issue
{
    private static final int THREAD_COUNT = 1000;
    private static final ConcurrentMap<String, Object> LOCKS = new ConcurrentHashMap<>();
    // This is just for example's sake. In reality this is a resource
    // related to the provided key (ex. user ID). This could be a database row,
    // File on disk, REST API call, etc.
    private static final AtomicBoolean SOME_SCOPED_RESOURCE = new AtomicBoolean(false);
    private static final Object SERVICE_CLASS_LOCK = new Object();

    public static void main(String[] args) throws Throwable
    {
        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
        CompletionService<Boolean> completionService = new ExecutorCompletionService<>(threadPool);
        AtomicReference<Throwable> taskException = new AtomicReference<>();
        for (int currentThreadNumber = 0; currentThreadNumber < THREAD_COUNT; currentThreadNumber++)
        {
            final int taskNumber = currentThreadNumber;
            completionService.submit(() -> {
                try
                {
                    someTask("someKey", taskNumber);
                }
                catch (Throwable t)
                {
                    t.printStackTrace();
                    taskException.set(t);
                }
                return true;
            });
        }
        for (int currentThreadNumber = 0; currentThreadNumber < THREAD_COUNT; currentThreadNumber++)
        {
            completionService.take();
        }
        threadPool.shutdownNow();
        if (taskException.get() != null)
        {
            throw taskException.get();
        }
    }

    public static void someTask(String key, int taskNumber)
    {
        AtomicBoolean shouldRemoveLock = new AtomicBoolean(false);
        Object newLock = new Object();
        Object previousLock = LOCKS.putIfAbsent(key, newLock);
        Object resourceSpecificLock = previousLock == null ? newLock : previousLock;
        shouldRemoveLock.set(previousLock == null);

        // No exceptions if I uncomment the following line
        // synchronized (SERVICE_CLASS_LOCK)
        {
            synchronized (resourceSpecificLock)
            {
                System.err.println(taskNumber + ": lock=" + resourceSpecificLock);
                Assert.assertFalse(SOME_SCOPED_RESOURCE.getAndSet(true), "failed for task " + taskNumber);
                System.err.println("did some work for task " + taskNumber);
                Assert.assertTrue(SOME_SCOPED_RESOURCE.getAndSet(false), "failed for task " + taskNumber);
            }
            // It also does not work if this block is moved to the end inside the synchronized
            // (resourceSpecificLock) block above, with no other changes.
            // So it seems the issue is related to not being able to remove the
            // lock at the right time.
            if (shouldRemoveLock.get())
            {
                System.err.println("removing resourceSpecificLock for task " + taskNumber);
                LOCKS.remove(key);
            }
        }
    }
}

【问题讨论】:

    标签: java multithreading parallel-processing synchronization locking


    【解决方案1】:

    附注你可以简单地这样做:

    Object newLock = new Object();
    Object previousLock = LOCKS.putIfAbsent(key, newLock);
    Object resourceSpecificLock = previousLock == null ? newLock : previousLock;
    

    只是:

    Object resourceSpecificLock = LOCKS.computeIfAbsent(key, v -> new Object());
    

    即使进行了更改,您仍然会有 race-condition。问题不在于锁的映射,而在于有多个线程访问同一资源这一事实

    SOME_SHARED_RESOURCE
    

    即使线程使用基于 Key 的锁进行同步,多个线程仍然可以更改 SOME_SHARED_RESOURCE线程使用来自不同键的不同锁)。

    竞态条件如下:

    1. Key = ID1 的线程 1 调用 SOME_SHARED_RESOURCE.getAndSet(true)
    2. 在线程 1 有时间调用 SOME_SHARED_RESOURCE.getAndSet(false) 之前,来自具有不同 ID(因此不同的锁)的密钥的第二个线程调用 SOME_SHARED_RESOURCE.getAndSet(true) 并且您得到了异常。

    这就是为什么当你使用synchronized (SOME_SHARED_RESOURCE) 时它可以工作,因为没有多个线程调用SOME_SHARED_RESOURCE

    如果您想测试Map 的锁,我建议您调整您的示例以使用基于密钥的共享资源。这样就可以测试出同一个Key的线程不会同时改变同一个Key相关的数据。

    public class Issue
    {
        private static final int THREAD_COUNT = 1000;
        private static final ConcurrentMap<String, Object> LOCKS = new ConcurrentHashMap<>();
        private static final ConcurrentMap <String, Boolean> SOME_SHARED_RESOURCE = new ConcurrentHashMap();
    
        public static void main(String[] args) throws Throwable
        {
            ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
            CompletionService<Boolean> completionService = new ExecutorCompletionService<>(threadPool);
            AtomicReference<Throwable> taskException = new AtomicReference<>();
            for (int currentThreadNumber = 0; currentThreadNumber < THREAD_COUNT; currentThreadNumber++)
            {
                final int taskNumber = currentThreadNumber;
                completionService.submit(() -> {
                    try
                    {
                        String key = "someKey";
                        SOME_SHARED_RESOURCE.putIfAbsent(key, Boolean.TRUE);
                        someTask(key, taskNumber);
                    }
                    catch (Throwable t)
                    {
                        t.printStackTrace();
                        taskException.set(t);
                    }
                    return true;
                });
            }
            for (int currentThreadNumber = 0; currentThreadNumber < THREAD_COUNT; currentThreadNumber++)
            {
                completionService.take();
            }
            threadPool.shutdownNow();
            if (taskException.get() != null)
            {
                throw taskException.get();
            }
        }
    
        public static void someTask(String key, int taskNumber)
        {
            Object resourceSpecificLock = LOCKS.computeIfAbsent(key, v -> new Object());
    
                synchronized (resourceSpecificLock)
                {
                    System.err.println(taskNumber + ": lock=" + resourceSpecificLock);
                    if(!SOME_SHARED_RESOURCE.replace(key, Boolean.FALSE)){
                        throw new RuntimeException();
                    }
                    System.err.println("did some work for task " + taskNumber);
                    if(SOME_SHARED_RESOURCE.replace(key, Boolean.TRUE)){
                        throw new RuntimeException();
                    }
                    LOCKS.remove(key);
                }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2013-06-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-14
      相关资源
      最近更新 更多