【问题标题】:How to handle synchronized access to List within Map<String, List>?如何在 Map<String, List> 中处理对 List 的同步访问?
【发布时间】:2010-11-08 22:17:37
【问题描述】:

更新:请注意。
我提出的问题得到了回答。对我来说不幸的是,这个问题比标题中的问题要大得多。除了向地图添加新条目外,我还必须同时处理更新和删除。如果没有其中一个,我想到的场景似乎无法实现:
一种。死锁 湾。复杂且耗时的检查和锁定
查看问题底部的最终想法。

原帖:

嗨,

我有一个带有地图的 spring bean。

这是我想用它做的:

很少有并发 JMS 侦听器会接收带有操作的消息。每个动作由两个用户组成:long userA 和 long userB。消息将有它自己的字符串回复队列,该队列将用于识别操作。 因为当其中一个用户参与另一个执行的操作时,我不能允许执行一个操作,所以我将使用此地图作为正在发生的事情的注册表并控制操作的执行。 所以假设我收到三个动作:
1.用户A,用户B
2.用户B,用户C
3.用户C,用户A

收到第一个操作后,地图为空,因此我将在其中记录有关操作的信息并开始执行操作。
当收到第二个操作时,我可以看到用户 B 正忙于第一个操作,所以我只是记录有关操作的信息。
第三次行动也是如此。

地图将如下所示:
[用户A:[动作1,动作3],
用户B:[action1,action2],
用户C:[action2, action3]]

一旦第一个操作完成,我将从注册表中删除有关它的信息,并获取有关 userA 和 userB [action3,action2] 的下一步操作的信息。然后我会尝试重新启动它们。

我想现在你已经知道我想用这张地图做什么了。

因为要同时从多个线程访问地图,所以我必须以某种方式处理同步。

我将有方法向地图添加新信息并在操作完成后从地图中删除信息。 remove 方法将为刚刚完成操作的两个用户返回下一个操作 [如果有的话]。

因为可能同时执行数百个操作,而且繁忙用户的操作百分比应该很低,所以我不想阻止每个添加/删除操作对地图的访问。

我考虑过仅对 Map 中的每个 List 进行同步访问,以允许同时访问多个用户条目。但是......因为当用户没有任何操作时,我想从地图中删除该用户的条目。另外...当用户在地图中没有条目时,我将不得不创建一个。我有点担心那里可能会发生冲突。

处理这种情况的最佳方法是什么? 使这两种方法 - 添加和删除 - 同步(我认为这是最坏的情况)是唯一正确的 [安全] 方法吗?

另外,我将有另一个地图,其中包含作为键的操作 ID 和作为值的用户 ID,因此更容易识别/删除用户对。我相信我可以跳过这个同步,因为没有一个动作会同时执行两次的情况。

虽然代码是用 Groovy 编写的,但我相信没有 Java 程序员会觉得它难以阅读。它背后是Java。 请考虑以下伪代码,因为我只是在做原型。

class UserRegistry {

    // ['actionA':[userA, userB]]
    // ['actionB':[userC, userA]]
    // ['actionC':[userB, userC]]
    private Map<String, List<Long>> messages = [:]
    /**
     * ['userA':['actionA', 'actionB'],
     * ['userB':['actionA', 'actionC'],
     * ['userC':['actionB', 'actionC']
     */
    private Map<long, List<String>> users = [:].asSynchronized()

    /**
     * Function will add entries for users and action to the registry.
     * @param userA
     * @param userB
     * @param action
     * @return true if a new entry was added, false if entries for at least one user already existed
     */
    public boolean add(long userA, long userB, String action) {
        boolean userABusy = users.containsKey(userA)
        boolean userBBusy = users.containsKey(userB)
        boolean retValue
        if (userABusy || userBBusy)  {
            if (userABusy) {
                users.get(userA).add(action)
            } else {
                users.put(userA, [action].asSynchronized())
            }
            if (userBBusy) {
                users.get(userB).add(action)
            } else {
                users.put(userB, [action].asSynchronized())
            }
            messages.put(action, [userA, userB])
            retValue = false
        } else {
            users.put(userA, [action].asSynchronized())
            users.put(userB, [action].asSynchronized())
            messages.put(action, [userA, userB])
            retValue = true
        }
        return retValue
    }

    public List remove(String action) {
        if(!messages.containsKey(action)) throw new Exception("we're screwed, I'll figure this out later")
        List nextActions = []
        long userA = messages.get(action).get(0)
        long userB = messages.get(action).get(1)
        if (users.get(userA).size() > 1) {
            users.get(userA).remove(0)
            nextActions.add(users.get(userA).get(0))
        } else {
            users.remove(userA)
        }
        if (users.get(userB).size() > 1) {
            users.get(userB).remove(0)
            nextActions.add(users.get(userB).get(0))
        } else {
            users.remove(userB)
        }
        messages.remove(action)
        return nextActions
    }
}

编辑 昨晚我想到了这个解决方案,似乎消息映射可能会消失,而用户映射会是:

Map users<String, List<UserRegistryEntry>>

在哪里
UserRegistryEntry
字符串 actionId
布尔等待

现在假设我得到了这些操作:

动作1:用户A,用户C
动作2:用户A,用户D
动作3:用户B,用户C
动作4:用户B,用户D

这意味着action1和action4可以同时执行,而action2和action3被阻塞。地图看起来像这样:

[
[userAId: [actionId: action1, waiting: false],[actionId: action2, waiting: true]],  
[userBId: [actionId: action3, waiting: true], [actionId: action4, waiting: false]],  
[userCId: [actionId: action1, waiting: false],[actionId: action3, waiting: true]],
[userDId: [actionId: action2, waiting: true], [actionId: action4, waiting: false]]
]

这样,当动作执行完成时,我使用以下方法从地图中删除条目:
userAId, userBId, actionId
并获取有关 userA 和 userB [如果有的话] 的第一个非阻塞等待操作的详细信息,并将它们传递给执行。

所以现在我需要两个方法,将数据写入地图并将其从地图中删除。

public boolean add(long userA, long userB, String action) {
    boolean userAEntryExists = users.containsKey(userA)
    boolean userBEntryExists = users.containsKey(userB)
    boolean actionWaiting = true
    UserRegistryEntry userAEntry = new UserRegistryEntry(actionId: action, waiting: false)
    UserRegistryEntry userBEntry = new UserRegistryEntry(actionId: action, waiting: false)
    if (userAEntryExists || userBEntryExists) {
        if (userAEntryExists) {
            for (entry in users.get(userA)) {
                if (!entry.waiting) {
                    userAEntry.waiting = true
                    userBEntry.waiting = true
                    actionWaiting = true
                    break;
                }
            }
        }

        if (!actionWaiting && userBEntryExists) {
            for (entry in users.get(userB)) {
                if (!entry.waiting) {
                    userAEntry.waiting = true
                    userBEntry.waiting = true
                    actionWaiting = true
                    break;
                }
            }
        }
    }

    if (userBEntryExists) {
        users.get(userA).add(userAEntry)
    } else {
        users.put(userA, [userAEntry])
    }

    if (userAEntryExists) {
        users.get(userB).add(userBEntry)
    } else {
        users.put(userB, [userBEntry])
    }

    return actionWaiting
}

对于删除:

public List remove(long userA, long userB, String action) {
        List<String> nextActions = []
        finishActionAndReturnNew(userA, action, nextActions)
        finishActionAndReturnNew(userB, action, nextActions)

        return nextActions;
    }

    private def finishActionAndReturnNew(long userA, String action, List<String> nextActions) {
        boolean userRemoved = false
        boolean actionFound = false
        Iterator itA = users.get(userA).iterator()
        while (itA.hasNext()) {
            UserRegistryEntry entry = itA.next()
            if (!userRemoved && entry.actionId == action) {
                itA.remove()
            } else {
                if (!actionFound && isUserFree(entry.otherUser)) {
                    nextActions.add(entry.actionId)
                }
            }
            if (userRemoved && actionFound) break
        }
    }

    public boolean isUserFree(long userId) {
        boolean userFree = true
        if (!users.containsKey(userId)) return true
        for (entry in users.get(userId)) {
            if (!entry.waiting) userFree = false
        }
        return userFree
    }

最后的想法
这种情况是一个杀手:
[操作ID,用户A,用户B]
[a, 1,2]
[b, 1,3]
[c, 3,4]
[d, 3,1]
动作 a 和 c 同时执行,b 和 d 正在等待。
当 a 和 c 完成后,必须删除用户 1、2、3、4 的条目,因此一个线程将锁定 1 和 2,另一个线程将锁定 3 和 4。当这些用户被锁定时,必须检查他们每个人的下一步操作。当代码确定用户 1 的下一个操作是用户 3 并且用户 3 的下一个操作是用户 1 时,乳清会尝试锁定它们。这是发生死锁的时候。我知道我可以围绕它编写代码,但执行起来似乎需要很长时间,并且会阻塞两个工作人员。
现在我会问另一个关于 SO 的问题,更多关于我的问题的主题,同时尝试使用 JMS 对解决方案进行原型设计。

【问题讨论】:

  • 基元不能是泛型类型。使用 Long 而不是 long。
  • 感谢 Leo,正如我在问题中所写,认为这是一个伪代码。只是想或多或少地可视化我想用地图做什么。

标签: java collections concurrency groovy synchronization


【解决方案1】:

您可能需要再次查看同步(集合)的工作方式:

这(作为非排他性示例)不是线程安全的:

if (users.get(userA).size() > 1) {
    users.get(userA).remove(0)

请记住,只有单个“同步”方法可以保证是原子的,没有更大的锁定范围。

编码愉快。

编辑 - 每用户同步锁(已更新以供评论):

只需使用标准数据结构,您就可以通过使用ConcurrentHashMap 来实现每键锁定——尤其是通过使用“putIfAbsent”方法。 (这与仅使用“同步 HashMap”的 get/put 相比,明显不同,见上文。)

下面是一些伪代码和注释:

public boolean add(long userA, long userB, String action) {
    // The put-if-absent ensures the *the same* object but may be violated when:
    //   -users is re-assigned
    //   -following approach is violated
    // A new list is created if needed and the current list is returned if
    // it already exists (as per the method name).
    // Since we have synchronized manually here, these lists
    // themselves do not need to be synchronized, provided:
    // Access should consistently be protected across the "higher"
    // structure (per user-entry in the map) when using this approach.
    List listA = users.putIfAbsent(userA, new List)
    List listB = users.putIfAbsent(userB, new List)
    // The locks must be ordered consistently so that
    // a A B/B A deadlock does not occur.
    Object lock1, lock2
    if (userA < userB) {
        lock1 = listA, lock2 = listB
    } else {
        lock1 = listB, lock2 = listA
    }
    synchronized (lock1) { synchronized (lock2) {{ // start locks

    // The rest of the code can be simplified, since the
    // list items are already *guaranteed* to exist there is no
    // need to alternate between add and creating a new list.
    bool eitherUserBusy = listA.length > 0 || listB.length > 0
    listA.add(action)
    listB.add(action)
    // make sure messages allows thread-safe access as well
    messages.put(action, [userA, userB])
    return !eitherUserBusy

    }} // end locks
}

我不知道这在您的使用场景与单个通用锁对象之间如何公平。除非有明显的优势,否则通常建议使用“更简单”。

HTH 和快乐编码。

【讨论】:

  • 谢谢 pst。我要做的是执行一些同步的动作,而另一些则不同步。我想知道是否可以同步插入到 Map 中,以便在创建新密钥时没有其他线程可以创建新密钥。然后,如果线程修改了键 A 下的 Map 中的列表,我不希望其他线程访问同一个列表,但允许访问其他键下的列表。例如,如果我要检查 map 中的键是否存在于一个方法调用中,并且如果它没有单独的同步方法来重新检查并在需要时添加条目,它会起作用吗?这样可以吗?
  • @Krystian 更新后的答案可能会回答您的一些问题。我认为您会发现 putIfAbsent 方法是一种合适的方法。
  • 我试图在两个级别上实现 ReentrantReadWriteLock:用户地图和地图条目中的每个列表。这似乎是一个更好的方法。我将尝试使用适当的删除命令来实现它,看看它是如何进行的。在删除命令中,我必须同时检查并锁定地图中的多个列表[如果它们存在],因为我必须检查用户的可用性,以便与刚刚完成执行的用户进行下一步操作。 [场景在我上次编辑的问题中描述]
  • 我阅读了有关 ConcurrentHashMap 的信息,似乎即使它非常适合插入新条目,但在删除期间的操作方面可能没有那么好。在删除期间,我必须更新两个条目,锁定条目并通过[我的意思是锁定]其他条目,如果需要更新它们。在这种情况下,我认为这意味着一个内部带有同步块的循环。听起来怪怪的。我将在今天晚些时候尝试制作原型。再次感谢!
  • 你好。由于其他线程在我同步之前删除了映射条目,因此我在 putIfAbsent 和同步块之间得到了 NPE。此外,我在不同的删除调用之间遇到了死锁。我开始认为不锁定地图是不可能的。
【解决方案2】:

【讨论】:

  • [:].asSynchronized() 表示 Collections.synchronizedMap(new HashMap());但我不确定我在这里所做的是否明智。从性能和数据完整性的角度来看。
  • 但是这些(单独)都没有解决上述问题。特别是,如果没有包含同步映射的显式锁(这是完全可行的),就无法获得原子测试和设置语义或类似的语义。
【解决方案3】:

您在类中有两个全局状态持有者,并且在修改它们的两个方法中的每一个中都有复合动作。因此,即使我们将 Map 更改为 ConcurrentHashMap 并将 List 更改为 CopyOnWriteArrayList 之类的东西,它仍然不能保证一致的状态。

我看到你会经常写入 List,所以 CopyOnWriteArrayList 无论如何可能太昂贵了。 ConcurrentHashMap 只有 16 路条带化。如果您有更好的硬件,另一种选择是 Cliff Click 的 highscalelib(在方法中适当锁定之后)。

回到一致性问题,如何使用 ReentrantLock 而不是同步,看看是否可以从 lock()-to-unlock() 序列中排除一些语句。如果您使用 ConcurrentMap,则 add() 中执行 containsKey() 的前两个语句可以是乐观的,您可以将它们从锁定块中排除。

您真的需要消息映射吗?它有点像用户的反向索引。另一种选择是使用另一种 watch() 方法,该方法在用户更改后根据来自 add() 的信号定期更新消息映射。刷新也可以是完全异步的。在这样做时,您可以在更新消息时对用户使用带有 readLock() 的 ReadWriteLock。在这种情况下,add() 可以安全地获取用户的 writeLock()。要使这个合理地正确,还需要做一些工作。

【讨论】:

  • 感谢 gshx。我昨晚想了想,是的,不需要消息映射,但这意味着另一个映射的外观会发生变化。看看编辑。仍然不确定我必须进行什么同步才能使其在不阻塞其他线程的情况下工作。
猜你喜欢
  • 2022-07-31
  • 1970-01-01
  • 1970-01-01
  • 2018-09-22
  • 2020-04-01
  • 2016-01-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多