【问题标题】:Lock handler for arbitrary keys任意键的锁定处理程序
【发布时间】:2017-06-13 09:26:58
【问题描述】:

我有为任意键实现“锁定处理程序”的代码。给定一个key,它确保一次只有一个线程可以process那个(或等于)键(这里意味着调用externalSystem.process(key)调用)。

到目前为止,我有这样的代码:

public class MyHandler {
    private final SomeWorkExecutor someWorkExecutor;
    private final ConcurrentHashMap<Key, Lock> lockMap = new ConcurrentHashMap<>();

    public void handle(Key key) {
        // This can lead to OOM as it creates locks without removing them
        Lock keyLock = lockMap.computeIfAbsent( 
            key, (k) -> new ReentrantLock()
        );
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

我知道这段代码可以导致OutOfMemoryError,因为没有人清楚地图。

我在考虑如何制作能够累积有限元素数量的地图。当超出限制时,我们应该用新的替换最旧的访问元素(此代码应与最旧的元素同步作为监视器)。但我不知道如何让回调告诉我超出限制。

请分享你的想法。

附言

我重读了任务,现在我发现我有限制,handle 方法不能被调用超过 8 个线程。我不知道它对我有什么帮助,但我只是提到了它。

附言 2

@Boris the Spider 提出了不错且简单的解决方案:

} finally {
      lockMap.remove(key);
      keyLock.unlock();
}

但是在 Boris 注意到我们的代码不是线程安全的,因为它破坏了行为之后:
让我们研究 3 个使用相同键调用的线程:

  1. Thread#1 获取锁,现在在 map.remove(key); 之前
  2. Thread#2 使用 equals 键调用,因此它在 thread#1 释放锁定时等待。
  3. 然后线程#1 执行map.remove(key);。在这个线程#3 调用方法handle 之后。它检查该键的锁是否在映射中不存在,因此它创建新锁并获取它。
  4. 线程#1 释放锁,因此线程#2 获取它。
    因此,线程#2 和线程#3 可以并行调用等于键。但这不应该被允许。

为了避免这种情况,在map清除之前,我们应该阻塞任何线程来获取锁,而waitset中的所有线程都没有获取并释放锁。看起来它需要足够复杂的同步,它会导致算法工作缓慢。当地图大小超过某个限制值时,也许我们应该不时清除地图。

我浪费了很多时间,但不幸的是我不知道如何实现这一点。

【问题讨论】:

  • 蜘蛛鲍里斯是这样的吗? } 最后 { lockMap.remove(keyLock); keyLock.unlock(); }
  • @Boris the Spider 但它可以防止 OOM 错误
  • 嗯,再想想,这行不通。场景:1 出现,创建一个Lock 并锁定它。 2 出现并找到锁,等待。 1 完成、解锁和移除。 3 出现并找不到 Lock 因此 23 将具有并发访问权限。为那个脑子放屁道歉。
  • @Boris the Spider 嗯...你是对的,但它看起来不错
  • @Boris 这是一个非常好的案例。现在我真的不明白如何从地图中删除价值并确保没有人想获得这个

标签: java concurrency java-8 out-of-memory concurrenthashmap


【解决方案1】:

您无需尝试将大小限制为某个任意值 - 事实证明,您可以完成这种“锁定处理程序”习语,同时只存储准确键的数量目前锁定在地图中。

这个想法是使用一个简单的约定:成功地添加映射到映射算作“锁定”操作,删除它算作“解锁”操作。这巧妙地避免了在某些线程仍处于锁定状态和其他竞争条件时删除映射的问题。

此时,映射中的value只用于阻塞其他以相同key到达的线程,需要等到映射被移除。

这是一个示例1,地图值使用CountDownLatch 而不是Lock

public void handle(Key key) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    // try to acquire the lock by inserting our latch as a
    // mapping for key        
    while(true) {
        CountDownLatch existing = lockMap.putIfAbsent(key, latch);
        if (existing != null) {
            // there is an existing key, wait on it
            existing.await();
        } else {
            break;
        }
    }

    try {
        externalSystem.process(key);
    } finally {
        lockMap.remove(key);
        latch.countDown();
    }
}

在这里,映射的生命周期仅与持有锁的时间一样长。映射的条目永远不会超过对不同键的并发请求。

您的方法的不同之处在于映射不会“重复使用” - 每个handle 调用都会创建一个新的锁存器和映射。由于您已经在执行昂贵的原子操作,因此在实践中这不太可能会放缓。另一个缺点是,由于有许多等待线程,当锁存器倒计时时,所有都会被唤醒,但只有一个会成功放入新的映射并因此获得锁——其余的会在新锁。

可以构建另一个版本,当线程出现并等待现有映射时,它会重新使用映射。基本上,解锁线程只是对其中一个等待线程进行“切换”。只有一个映射将用于等待同一键的整个线程集 - 它按顺序传递给每个线程。大小仍然是有界的,因为没有更多线程在等待给定的映射,它仍然被删除。

为了实现这一点,您将CountDownLatch 替换为可以计算等待线程数的映射值。当一个线程进行解锁时,它首先检查是否有线程在等待,如果是,则唤醒一个进行切换。如果没有线程在等待,它会“销毁”对象(即,设置一个标志,表明该对象不再在映射中)并将其从映射中删除。

您需要在适当的锁下进行上述操作,并且有一些棘手的细节。在实践中,我发现上面这个简短而甜蜜的例子效果很好。


1 即时编写,未编译,未测试,但想法可行。

【讨论】:

  • 它声明了两个锁存器引用latchexisting,并将latch 引用初始化为值为1 的新CDL。existing 引用稍后在循环中分配。无论如何,我简化了那条线以使其现在更清晰。
  • existing 最初没有初始化,因此我相信代码会挂在 latch.await();
  • 我不确定如果 latch.countDown();latch.await(); 之前调用会发生什么
  • @gstackoverflow - 如果countDown() 出现在await() 之前,那么await() 会立即返回。这是锁存器的一个主要用例——它们通常在初始化后倒计时一次,所以之后所有的 await() 调用都会立即返回。
  • @gstackoverflow - 我不确定“最初存在的未初始化”是什么意思 - 它在声明时立即初始化。无论如何,在 Java 中使用未初始化的变量是不可能的:编译器检测到它并抱怨。
【解决方案2】:

您可以依靠方法 compute(K key, BiFunction&lt;? super K,? super V,? extends V&gt; remappingFunction) 来同步对给定键的方法 process 的调用,您甚至不再需要使用 Lock 作为地图值的类型,因为您不需要不要再依赖它了。

这个想法是依靠 ConcurrentHashMap 的内部锁定机制来执行你的方法,这将允许线程并行执行 process 方法,用于对应哈希值不属于同一 bin 的键。这相当于基于条带锁的方法,只是不需要额外的第三方库。

条带锁的方法很有趣,因为它在内存占用方面非常轻,因为您只需要有限数量的锁来执行此操作,因此您的锁所需的内存占用是已知的并且永远不会改变,这不是对于每个键使用一个锁的方法(如您的问题),因此通常更好/建议使用基于条带锁的方法来满足这种需求。

所以你的代码可能是这样的:

// This will create a ConcurrentHashMap with an initial table size of 16   
// bins by default, you may provide an initialCapacity and loadFactor
// if too much or not enough to get the expected table size in order
// increase or reduce the concurrency level of your map
// NB: We don't care much of the type of the value so I arbitrarily
// used Void but it could be any type like simply Object
private final ConcurrentMap<Key, Void> lockMap = new ConcurrentHashMap<>();

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.compute(
        lockKey,
        (key, value) -> {
            // Execute the process method under the protection of the
            // lock of the bin of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

NB 1:因为我们总是返回null,所以映射总是空的,这样你就永远不会因为这个映射而耗尽内存。

注意 2:由于我们从不影响给定键的值,请注意,也可以使用方法 computeIfAbsent(K key, Function&lt;? super K,? extends V&gt; mappingFunction)

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.computeIfAbsent(
        lockKey,
        key -> {
            // Execute the process method under the protection of the
            // lock of the segment of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

注意 3: 确保您的方法 process 永远不会为任何键调用方法 handle,因为您最终会出现无限循环(相同的键)或死锁(其他非有序键) ,例如:如果一个线程调用handle(key1),然后process内部调用handle(key2),另一个线程并行调用handle(key2),然后process内部调用handle(key1),无论使用哪种方法,都会出现死锁)。此行为并非特定于此方法,它会发生在任何方法中。

【讨论】:

  • action if (!hasQueuedThreads()) {lockMap.remove(key, this); 之间可以添加thread_1新值到地图并通过thread_2获取锁。因此 thread_1 从 map 中删除了锁。在此操作之后 thread_3 将向地图添加新的 vlue 并添加新的锁。因此 thread_2 和 thread_3 将并行工作
  • 这是一种有趣的方法 - 使用 CHM 的机制来进行锁定,而无需在地图中插入任何内容 :)。但是,您受制于地图的内部实现,它并不是真正设计为从计算函数内部调用process 之类的任意函数。文档说:“在计算过程中,其他线程对该地图的一些尝试更新操作可能会被阻止,因此计算应该简短而简单,并且不得尝试更新该地图的任何其他映射。”
  • @Ben Manes:默认容量指定为16,因此您只需指定显式容量,如果这还不够。当然,即使是最大容量也不能完全排除哈希冲突。
  • @gstackoverflow:正如 Nicolas Filotto 已经说过的,返回 null 意味着没有存储空间,这很重要,因为 ConcurrentHashMap 根本不支持存储 null。但是Map.computeIfAbsent 的一般约定是将存储到null(如果支持)的映射视为 absent 值。
  • @Holger 你是对的。出于某种原因,我认为表的延迟初始化导致第一次插入导致在调整大小之前产生单个锁。
【解决方案3】:

一种方法是完全放弃并发哈希映射,而只需使用带有锁定功能的常规 HashMap 来执行所需的映射操作并以原子方式锁定状态。

乍一看,这似乎降低了系统的并发性,但如果我们假设process(key) 调用相对于非常快速的锁操作来说是冗长的,那么它工作得很好,因为process() 调用仍然并发运行。独占临界区只发生少量且固定的工作。

这是一个草图:

public class MyHandler {

    private static class LockHolder {
        ReentrantLock lock = new ReentrantLock();
        int refcount = 0;
        void lock(){
            lock.lock();
        }
    } 

    private final SomeWorkExecutor someWorkExecutor;
    private final Lock mapLock = new ReentrantLock();
    private final HashMap<Key, LockHolder> lockMap = new HashMap<>();

    public void handle(Key key) {

        // lock the map
        mapLock.lock();
        LockHolder holder = lockMap.computeIfAbsent(key, k -> new LockHolder());
        // the lock in holder is either unlocked (newly created by us), or an existing lock, let's increment refcount
        holder.refcount++;
        mapLock.unlock();

        holder.lock();

        try {
            someWorkExecutor.process(key);
        } finally {
            mapLock.lock()
            keyLock.unlock();
            if (--holder.refcount == 0) {
              // no more users, remove lock holder
              map.remove(key);
            }
            mapLock.unlock();
        }
    }
}

我们使用refcount,它只在共享的mapLock 下进行操作,以跟踪有多少个锁的用户。每当引用计数为零时,我们可以在退出处理程序时摆脱条目。这种方法很好,因为它很容易推理并且如果process() 调用与锁定开销相比相对昂贵,它会很好地执行。由于地图操作发生在共享锁下,因此添加其他逻辑也很简单,例如,在地图中保留一些 Holder 对象、跟踪统计信息等。

【讨论】:

  • 不,它不能,因为所有refcount 操作都发生在mapLock 下(回复已删除的评论...)。
  • 好的,看起来不错。但锁释放应该在 finally 部分
  • 也许,如果您认为lock 之后的任何代码都可以引发异常(在我看来不是这样)。无论如何,由您来准备代码生产。这是一个草图,免费提供给你:)
  • 我需要时间来检查你的想法,但我真的要谢谢你
  • 这个解决方案可以正常工作,但性能很差。每个锁操作需要两个比较和交换操作,其中一个是竞争的。使用 refcount 和映射锁,您实际上所做的与锁本身所做的相同。顺便说一句:使用缓存解决方案,您还可以防止在繁忙的键上创建新的锁定对象。
【解决方案4】:

谢谢Ben Mane
我找到了这个变种。

public class MyHandler {
    private final int THREAD_COUNT = 8;
    private final int K = 100;
    private final Striped<Lock> striped = Striped.lazyWeakLock(THREAD_COUNT * K);
    private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor();

    public void handle(Key key) throws InterruptedException {
        Lock keyLock = striped.get(key);

        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }       
    }
}

【讨论】:

  • 请记住,这并不能提供与原始代码相同的并发保证。特别是,任何两个不同的键都可能在同一个锁上发生碰撞,一个会阻止另一个,即使它们用于不同的键。如果 8 个线程尝试同时处理,则几乎可以保证发生一次或多次冲突(极不可能有 8 个不同的键完美地散列到 8 个条带)。我在下面添加了一个答案,它保留了原始解决方案的完全并发性,没有无限增长的地图。
  • @BeeOnRope 感谢您的评论。稍微更正了代码。
  • 我没有看到 K 在任何地方使用过?
  • 当元素的大小相对于锁而言较小,且元素数量较多和/或在in中执行的操作时,使用Striped相对较好锁(在您的情况下,process() 调用)相对较小。例如,实现并发哈希映射的内部。 Striped 的好处是你有一个 固定数量 的锁,在施工时创建并且永远存在。因此,对于具有 100 万个元素的 CHM,您仍然只需要 16 个锁或其他任何东西。
  • ... 所以我认为它不太适合解决这个问题 - 元素的数量与线程数成比例:所以你自然会想要以相同的方式缩放锁的数量。如果只有少数线程同时调用process(),则您不需要 800 个锁,相反,如果您确实有很多线程进入,您需要更多锁。这就是为什么我更喜欢在process 中每个线程只有一个活锁的解决方案(并在同一key 上的所有等待线程之间共享),它可以很好地扩展并且不会导致冲突。
【解决方案5】:

这是一个简短而甜蜜的版本,它利用 Guava 的 Interner 类的 weak 版本来完成为每个键提供“规范”对象以用作锁的繁重工作,并实现弱引用语义以便清理未使用的条目。

public class InternerHandler {
    private final Interner = Interners.newWeakInterner();

    public void handle(Key key) throws InterruptedException {
        Key canonKey = Interner.intern(key);
        synchronized (canonKey) {
            someWorkExecutor.process(key);
        }       
    }
}

基本上我们要求一个规范canonKey,即equal()key,然后锁定这个canonKey。每个人都会就规范密钥达成一致,因此所有传递相同密钥的调用者都会就锁定对象达成一致。

Interner 的弱特性意味着只要不使用规范密钥,就可以删除条目,因此您可以避免在 interner 中积累条目。稍后,如果再次出现相等键,则选择新的规范条目。

上面的简单代码依赖于 synchronize 的内置监视器 - 但如果这对您不起作用(例如,它已经用于其他目的),您可以在 Key 类中包含一个锁定对象或者创建一个持有者对象。

【讨论】:

  • 请注意,您不应该在某些键上进行同步,例如字符串。使用代理锁对象将我们带回到 Guava 的弱条纹锁,它的作用几乎相同。 ?
  • @BenManes - 实际上,我在上面提到过您可能会也可能不会使用内置锁。在无限大的Striped 锁的限制下,我认为它们看起来很相似,是的。
  • @Ben Manes,请解释为什么 您不应该同步某些键,例如字符串
  • @gstackoverflow 一些类型是实习的,实例是全局共享的,例如-127 到 128 整数。然后不同的同步调用可能会发生冲突,从而导致更粗的锁定。理想情况下,您应该使用弱值,而不是键,以避免由于收集相同的键但不同的实例导致同时使用新锁而导致的竞争。
【解决方案6】:
class MyHandler {
    private final Map<Key, Lock> lockMap = Collections.synchronizedMap(new WeakHashMap<>());
    private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor();

    public void handle(Key key) throws InterruptedException {
        Lock keyLock = lockMap.computeIfAbsent(key, (k) -> new ReentrantLock()); 
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

【讨论】:

    【解决方案7】:

    就性能而言,每次为key 创建和删除锁定对象是一项代价高昂的操作。当您从并发映射(例如缓存)中添加/删除锁时,必须确保从缓存中放置/删除对象本身是线程安全的。所以这似乎不是一个好主意,但可以通过ConcurrentHashMap 实现

    条带锁定方法(也被并发散列映射内部使用)是更好的方法。从Google Guava docs 解释为

    当你想将一个锁与一个对象关联时,key 保证 你需要的是,如果 key1.equals(key2),那么与关联的锁 key1 与 key2 关联的锁相同。

    最粗略的方法是将每个键与相同的键关联 锁,这会导致最粗略的同步。在 另一方面,您可以将每个不同的键与不同的键相关联 锁,但这需要线性内存消耗和并发 随着新钥匙的发现,对锁系统本身进行管理。

    条带化允许程序员选择多个锁,它们是 基于它们的哈希码在键之间分布。这允许 程序员动态选择并发和之间的权衡 内存消耗,同时保留如果 key1.equals(key2),然后 striped.get(key1) == striped.get(key2)

    代码:

    //declare globally; e.g. class field level
    Striped<Lock> rwLockStripes = Striped.lock(16);
    
        Lock lock = rwLockStripes.get("key");
        lock.lock();
        try {
            // do you work here
        } finally {
            lock.unlock();
        }
    

    以下代码片段有助于实现锁定/解除锁定。

    private ConcurrentHashMap<String, ReentrantLock> caches = new ConcurrentHashMap<>();
    
    public void processWithLock(String key) {
        ReentrantLock lock = findAndGetLock(key);
        lock.lock();
        try {
            // do you work here
    
        } finally {
            unlockAndClear(key, lock);
        }
    }
    
    private void unlockAndClear(String key, ReentrantLock lock) {
        // *** Step 1: Release the lock.
        lock.unlock();
        // *** Step 2: Attempt to remove the lock
        // This is done by calling compute method, if given lock is present in
        // cache. if current lock object in cache is same instance as 'lock'
        // then remove it from cache. If not, some other thread is succeeded in
        // putting new lock object and hence we can leave the removal of lock object to that
        // thread.
        caches.computeIfPresent(key, (k, current) -> lock == current ? null : current);
    
    }
    
    private ReentrantLock findAndGetLock(String key) {
        // Merge method given us the access to the previously( if available) and
        // newer lock object together.
        return caches.merge(key, new ReentrantLock(), (older, newer) -> nonNull(older) ? older : newer);
    }
    

    【讨论】:

    • 我没有听懂你的答案,因为你更改了初始主题的名称。为什么在 Striped 参数中使用 10。为什么要使用 ReadWriteLock?
    • 我只是举例说明而已。我刚刚编辑了代码示例以使其清楚。在创建 Stripped 实例时,我们需要指定数字来指定急切初始化的实际 Lock 实例的数量(也就是说,这个数字限制了可以同时锁定的最大键数)。理想情况下,这个数字应该是 2 的幂(对于快速模式)(例如 2、4、8、16..)。如果我们给出 10,则内部使用 16(下一个可用数字,即 2 的幂)。希望这会有所帮助。
    【解决方案8】:

    您可以尝试JKeyLockManager 之类的东西,而不是写你自己的。来自项目描述:

    JKeyLockManager 为应用程序提供细粒度锁定 特定的键。

    现场给出的示例代码:

    public class WeatherServiceProxy {
      private final KeyLockManager lockManager = KeyLockManagers.newManager();
    
      public void updateWeatherData(String cityName, float temperature) {
        lockManager.executeLocked(cityName, () -> delegate.updateWeatherData(cityName, temperature)); 
      }
    

    【讨论】:

      【解决方案9】:

      调用时会添加新值

      lockMap.computeIfAbsent()
      

      所以你可以只检查 lockMap.size() 的项目数。

      但是您将如何找到第一个添加的项目?最好在使用后删除项目。

      【讨论】:

      • 作为noted above,删除将不起作用,会导致并发访问。
      • 那么我建议将密钥和锁添加到新对象中并将其存储在有序集合中。
      • 删除旧的Lock 将不起作用,除非您可以保证Lock 未在使用中。
      • 然后像这样删除 )) if(keyLock.tryLock()) { lockMap.remove(key); keyLock.unlock(); }
      • 问题是“但我不知道如何让回调告诉我超出限制”。所以写完全回答了这个问题。最初的问题不包含有关删除逻辑的任何内容。
      【解决方案10】:

      您可以使用存储对象引用的进程内缓存,例如 Caffeine、Guava、EHCache 或 cache2k。下面是一个如何使用cache2k 构建缓存的示例:

      final Cache<Key, Lock> locks =
        new Cache2kBuilder<Key, Lock>(){}
          .loader(
            new CacheLoader<Key, Lock>() {
              @Override
              public Lock load(Key o) {
                return new ReentrantLock();
              }
            }
          )
          .storeByReference(true)
          .entryCapacity(1000)
          .build();
      

      使用模式与您在问题中的情况一样:

          Lock keyLock = locks.get(key);
          keyLock.lock();
          try {
              externalSystem.process(key);
          } finally {
              keyLock.unlock();
          }
      

      由于缓存限制为 1000 个条目,因此会自动清除不再使用的锁。

      如果应用程序中的容量和线程数不匹配,缓存可能会清除正在使用的锁。该解决方案在我们的应用中可以完美运行多年。当有足够长的运行任务并且超出容量时,缓存将驱逐正在使用的锁。在实际应用程序中,您始终控制生命线程的数量,例如在 Web 容器中,您会将处理线程的数量限制为(示例)100。因此您知道使用的锁永远不会超过 100 个。如果考虑到这一点,则此解决方案的开销最小。

      请记住,只有当您的应用程序在单个 VM 上运行时,锁定才有效。您可能想看看分布式锁管理器 (DLM)。提供分布式锁的产品示例:hazelcast、infinispan、teracotta、redis/redisson。

      【讨论】:

      • 嗯,再想想,这行不通。场景: 1 出现,创建一个 Lock 并锁定它。 2 过来找锁,等待。 1 完成,解锁和移除。 3 出现并没有找到 Lock,因此 2 和 3 将具有并发访问权限。为那个大脑放屁道歉。(c)蜘蛛鲍里斯
      • 我相信这种情况是可能的
      • 完全正确 - 行不通。缓存要么按年龄逐出,要么按最后一次访问。如果一个线程签出一个锁,它会执行一次,之后就不会访问Cache。因此,Cache 可以通过足够长时间运行的任务驱逐“正在使用”的密钥。
      • @gstackoverflow 如果您使用weakValues() 而不是大小或到期时间,它将在CaffeineGuava 中工作。不过,这就是 Striped.lazyWeakLock(n) 的基础。
      • 这个解决方案在我们的应用中可以完美运行多年。当有足够长的运行任务并且超出容量时,缓存将驱逐正在使用的锁。在实际应用程序中,您始终控制生命线程的数量,例如在 Web 容器中,您会将处理线程的数量限制为(示例)100。因此您知道使用的锁永远不会超过 100 个。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-02
      • 1970-01-01
      • 2017-01-23
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多