【问题标题】:User level Locking Approach With synchronizedMap & ReentrantLock使用 synchronizedMap 和 ReentrantLock 的用户级锁定方法
【发布时间】:2020-09-09 14:37:56
【问题描述】:

我想实现基于用户的锁定,如下代码所示。

如果在用户“1234”(java.lang.String.class 标识符)上发生了某个进程,或者某个进程正在使用用户,则其他进程应该等待该进程完成。听起来很简单,我尝试使用ReenterrentLock 使其工作,但我陷入了永久等待状态和死锁,虽然我正准备使用synchronised 块使其工作,但我想用ReenterrentLock 实现这一目标。

以下是我的代码和日志。

让我知道我做错了什么。

//Research.java file
  public static final int PERIOD = 10;
  public static final int END_INCLUSIVE = 23;
  public static final int N_THREADS = 8;

  public static void main(String[] args) {
    final ExecutorService executorService = Executors.newFixedThreadPool(N_THREADS);
    IntStream.rangeClosed(1, END_INCLUSIVE).forEach(value -> executorService.submit(() -> {
      final String user = value % 2 == 0 ? "1234" : "5678";
      process(value + "process", user);
    }));
    executorService.shutdown();
  }

  private static void process(String tag, String user) {
    System.out.println("waiting tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
    AccountLock.getInstance().lock(user);
    System.out.println("in tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
    sleep(tag, PERIOD);
    AccountLock.getInstance().unlock(user);
    System.out.println("out tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
  }

  private static void sleep(String tag, long s) {
    boolean interrupt = false;
    try {
      TimeUnit.SECONDS.sleep(s);
    } catch (InterruptedException e) {
      interrupt = true;
      e.printStackTrace();
    } finally {
      if (interrupt) {
        Thread.currentThread().interrupt();
      }
    }
  }
/**
 * AccountLock
 */
final class AccountLock {
  private static final Map<String, Lock> LOCK_MAP = Collections.synchronizedMap(new HashMap<>());
  private static volatile AccountLock INSTANCE;

  private AccountLock() {
  }

  public static AccountLock getInstance() {
    if (INSTANCE == null) {
      synchronized (AccountLock.class) {
        if (INSTANCE == null) {
          INSTANCE = new AccountLock();
        }
      }
    }
    return INSTANCE;
  }

  public void lock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
      lock.lock();
      return lock;
    });
    LOCK_MAP.computeIfAbsent(user, s -> {
      final ReentrantLock lock = new ReentrantLock(true);
      lock.lock();
      return lock;
    });
  }

  public void unlock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
      lock.unlock();
      return null;
    });
  }
}
//logs
//waiting tag=2process,user=1234,TH=pool-1-thread-2
//waiting tag=3process,user=5678,TH=pool-1-thread-3
//waiting tag=1process,user=5678,TH=pool-1-thread-1
//waiting tag=4process,user=1234,TH=pool-1-thread-4
//waiting tag=5process,user=5678,TH=pool-1-thread-5
//in tag=3process,user=5678,TH=pool-1-thread-3
//in tag=4process,user=1234,TH=pool-1-thread-4
//in tag=5process,user=5678,TH=pool-1-thread-5
//waiting tag=6process,user=1234,TH=pool-1-thread-6
//waiting tag=7process,user=5678,TH=pool-1-thread-7
//waiting tag=8process,user=1234,TH=pool-1-thread-8

如果您需要相同的线程转储,请告诉我。

我在 mac 上使用 java8

【问题讨论】:

  • 看起来当你第一次尝试锁定一个用户时,你只是实例化了一个锁,你没有锁定它。 (第一次:空地图。computeIfPresent 因此什么都不做,computeIfAbsent 创建锁,但不接受它)。否则,所罗门可能是对的(尽管我猜sleep 是用于演示目的)。而且你也有一些无限的内存使用。
  • 关于这个模式的一些讨论:stackoverflow.com/questions/5639870/…
  • @GPI 谢谢!链接非常有帮助,一些答案也提出了同样的方法,你能帮我找出这里的死锁吗?我已经更新了一些代码

标签: java concurrency reentrantlock


【解决方案1】:

死锁是由地图和锁的交互产生的。

更准确地说:您使用Collections.syncrhonized 映射,它实际上对原始映射实例的每个(或左右)方法都设置了互斥。

这意味着,例如,只要有人在computeIfAbsent(或computeIfPresent)调用中,就不能在地图实例上调用其他方法。

到目前为止没问题。

除了,computeIfxx调用中,您通过获取锁实例来执行另一个锁定操作。

拥有两个不同的锁并且不保持严格的锁获取顺序是导致死锁的完美秘诀,您在此处进行了演示。

年表样本:

  1. 线程T1进入Map(获取锁M),创建锁L1并获取。 T1 包含 M 和 L1。
  2. 线程 T1 存在 map.computeXX,释放 M。T1 持有 L1(仅)。
  3. 线程 T2 进入地图(获取 M)。 T1 持有 L1,T2 持有 M。
  4. 线程 T2 尝试获取 L1(与 T1 相同),被 T1 阻塞。 T1 持有 L1,T2 持有 M 并等待 L1。你看到了吗?
  5. 线程 T1 已完成。它想释放L1,所以试图进入地图,这意味着试图获得M,它由T2持有。 T1 持有 L1,等待 M。T2 持有 M,等待 L1。游戏结束。

解决方案:不尝试创建锁并同时获取它是很诱人的,例如在锁定的方法中不获取锁。让我们尝试一下(只是为了说明一点,你会看到它有多难)。

如果您打破这种嵌入式锁模式,当您的线程已经拥有另一个锁时,您将永远不会进行锁定/解锁调用,从而防止任何死锁模式(单个重入锁不会发生死锁)。

这意味着:将您的 AccountLock 类从锁定实例更改为锁定工厂实例。或者,如果你想整合一些东西,那就换个方式吧。

lock(String user) {
    LOCKS.computeIfAbsent(user, u -> new ReentrantLock()).lock();
}

通过将锁获取移到地图的方法调用之外,我已将其从地图的互斥锁中删除,并消除了死锁的可能性。

修正后,我创建了另一个错误。

在解锁方面,您使用返回 null 的computeIfPresent。这意味着一旦完成锁定,您打算将其从地图中删除。这一切都很好,但它会在解锁情况下产生竞争条件。

  1. T1 为用户 1 创建锁 L1,将其放入地图并锁定。
  2. T2 想要相同的 L1,从地图中获取它,然后等待它的可用性
  3. T1 完成,释放 L1(T2 还不知道),并将其从地图中移除
  4. T3 进来并想为用户 1 锁定,但 L1 从地图上消失了,所以它创建 L2 并获取它。
  5. T2 看到 L1 是免费的,获取它并工作
  6. 此时,T2 和 T3 都可以同时访问用户 1,这不好
  7. T2 首先完成,转到地图实例并释放用户 1 的锁,即 L2,它一开始就从未获得过锁。 IllegalMonitorStateException。

没有简单的方法可以解决这个问题。您必须确保想要访问用户的并发线程具有一致的锁实例(这里的问题是 T1 释放锁而 T2 持有它,但尚未锁定它)。这在您的原始代码中不会发生 - 因为地图的互斥锁,但该互斥锁会造成死锁。

那该怎么办? 您永远无法从地图中移除键。

public void unlock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
        lock.unlock();
        return lock;
});

但是没有人从映射中释放键,它会无限增长 - 可以添加一个线程安全计数器来检查锁在释放之前是否“被获取”。这会反过来产生另一个错误吗?

话虽如此,这里还有几件事需要加强:

  • 锁释放应该在 finally 块中执行
  • 您创建的锁实例的数量是无限的。您可能希望避免这种情况(这是真正困难的部分)
  • 在地图级别使用互斥锁有点浪费。 ConcurrentHashMap 将提供更细粒度的锁定机制
  • 谁知道这里留下了什么错误。

结论

尝试看看this other question 中提供的任何解决方案是否可以帮助您实现目标。使用受人尊敬的库的实现可能更安全。

并发编程很难。别人已经为你做了。

【讨论】:

  • 非常感谢您的详细回答,我能够得到这篇工作帖子,准确地阅读您所说的答案。工作至今,但我已将代码迁移到番石榴的条纹锁
  • 使用条纹锁可能是一个正确的选择。严格来说,您获得的不是每个用户的锁,但您可以通过选择条带数来决定争用级别,这可能就足够了。下一级:如何在各个实例之间做分布式锁!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-07-16
  • 1970-01-01
  • 2016-07-24
  • 2023-03-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多