【问题标题】:How can I block ConcurrentHashMap get() operations during a put()如何在 put() 期间阻止 ConcurrentHashMap get() 操作
【发布时间】:2021-10-28 06:32:43
【问题描述】:
ConcurrentHashMap<String, Config> configStore = new ConcurrentHashMap<>();
...
void updateStore() {
  Config newConfig = generateNewConfig();
  Config oldConfig = configStore.get(configName);
  if (newConfig.replaces(oldConfig)) {
   configStore.put(configName, newConfig);
  }
}

ConcurrentHashMap 可以被多个线程读取,但只能由单个线程更新。当put() 操作正在进行时,我想阻止get() 操作。这里的基本原理是,如果put() 操作正在进行,这意味着映射中的当前条目是陈旧的,并且所有get() 操作都应该阻塞,直到put() 完成。在不同步整个地图的情况下,如何在 Java 中实现这一点?

【问题讨论】:

  • 您可以使用 java.util.concurrent.locks.ReadWriteLock。但是想想 Map put 操作有多快。你认为这会有什么不同吗?您要准确地避免什么情况?
  • 您可能应该使用compute 或其他可以为您解决此问题的 CHM 操作之一
  • “ConcurrentHashMap 可以由多个线程读取,但只能由单个线程更新”您的声明来源在哪里?或者你的意思是 same 条目?
  • "这里的基本原理是,如果 put() 操作正在进行,这意味着 map 中的当前条目是陈旧的,并且所有 get() 操作都应该阻塞,直到 put()是完整的。”这个理由是完全错误的。如果它是正确的,它同样适用于 put() 操作正在进行之前。毕竟,如果某个线程即将调用 put(),那么信息就是陈旧的。因此,如果该论点是合乎逻辑的,则意味着该操作甚至在您调用它之前就应该阻塞,但这显然是不可能的。所以这个理由是不合理的。

标签: java multithreading concurrency hashmap synchronization


【解决方案1】:

看来您可以将此推迟到 compute,它会为您处理好:

Config newConfig = generateNewConfig();
configStore.compute(
    newConfig,
    (oldConfig, value) -> {
       if (newConfig.replaces(oldConfig)) {
            return key;
       }
       return oldConfig;
    }
);

使用此方法可以得到两个保证:

在计算过程中,其他线程对该地图的一些尝试更新操作可能会被阻塞,因此计算应该简短而简单

整个方法调用都是原子执行的

根据其文档。

【讨论】:

    【解决方案2】:

    这根本行不通。想一想:当代码意识到信息已过时,经过一段时间后,.put 调用就完成了。即使.put 调用以某种方式阻塞,时间线如下:

    • Cosmos 中发生了一些事件,导致您的配置过时。
    • 一段时间过去了。 [一]
    • 您运行一些代码后发现情况确实如此。
    • 一段时间过去了。 [乙]
    • 您的代码开始调用.put
    • 过去的时间非常短。 [C]
    • 您的代码完成了.put 调用。

    您所要求的是一种策略,即消除[C],同时完全不采取任何措施来防止读取[A][B] 处的陈旧数据,这两者似乎相当问题更大。

    随便,给我答案

    ConcurrentHashMap 如果你想要这个就错了,它是为多个并发(因此得名)访问而设计的。你想要的是一个普通的旧HashMap,其中每个对其的访问都要经过一个锁。或者,您可以扭转逻辑:做您想做的事情的唯一方法是为所有内容(读取和写入)锁定;此时ConcurrentHashMap 的“并发”部分变得毫无意义:

    private final Object lock = new Object[0];
    
    public void updateConfig() {
        synchronized (lock) {
           // do the stuff
        }
    }
    
    public Config getConfig(String key) {
        synchronized (lock) {
            return configStore.get(key);
        }
    }
    

    注意:使用私有锁;公共锁就像公共领域。如果有一个对象,您无法控制的代码可以获得引用,并且您锁定它,您需要描述您的代码关于该锁定的行为,然后注册以永久保持该行为,或指示很明显,当您更改行为时,您的 API 刚刚经历了重大更改,因此您也应该增加主要版本号。

    出于同样的原因,鉴于您想要 API 控制这一事实,公共字段几乎总是一个坏主意,您希望除了您直接控制的代码之外的任何东西都无法访问您锁定的 refs。因此,为什么上面的代码没有在方法本身上使用synchronized 关键字(因为this 通常是一个到处泄漏的引用)。

    好吧,也许我想要不同的答案

    答案要么是“没关系”,要么是“使用锁”。如果[C] 真的是你所关心的,那时间太短了,与[A][B] 的时间相比相形见绌,如果A/B 是可以接受的,那么C 肯定也是如此。在这种情况下:接受现状吧。

    或者,您可以使用锁,但甚至在数据变得陈旧之前进行锁定。此时间线保证不会发生过时的数据读取:

    • 宇宙永远不会让您的数据过时。
    • 您的代码本身是过期日期的唯一原因。
    • 无论何时运行的代码都会或可能最终导致数据过时:
    • 在开始之前获取锁。
    • 做一些(可能)使某些配置过时的事情。
    • 继续抓住锁;修复配置。
    • 释放锁。

    【讨论】:

    • 我意识到对我来说答案是“没关系”。感谢您澄清这一点。
    【解决方案3】:

    The accepted answer 建议使用compute(...) 而不是put()

    如果你愿意

    在 put() 操作正在进行时阻止 get() 操作

    那么你也应该使用compute(...) 而不是get()

    这是因为ConcurrentHashMap get()compute() 正在进行时不会阻塞。


    这里有一个单元测试来证明它:

      @Test
      public void myTest() throws Exception {
        var map = new ConcurrentHashMap<>(Map.of("key", "v1"));
        var insideComputeLatch = new CountDownLatch(1);
    
        var threadGet = new Thread(() -> {
          try {
            insideComputeLatch.await();
            System.out.println("threadGet: before get()");
            var v = map.get("key");
            System.out.println("threadGet: after get() (v='" + v + "')");
          } catch (InterruptedException e) {
            throw new Error(e);
          }
        });
    
        var threadCompute = new Thread(() -> {
          System.out.println("threadCompute: before compute()");
          map.compute("key", (k, v) -> {
            try {
              System.out.println("threadCompute: inside compute(): start");
              insideComputeLatch.countDown();
              threadGet.join();
              System.out.println("threadCompute: inside compute(): end");
              return "v2";
            } catch (InterruptedException e) {
              throw new Error(e);
            }
          });
          System.out.println("threadCompute: after compute()");
        });
    
        threadGet.start();
        threadCompute.start();
    
        threadGet.join();
        threadCompute.join();
      }
    

    输出:

    threadCompute: before compute()
    threadCompute: inside compute(): start
    threadGet: before get()
    threadGet: after get() (v='v1')
    threadCompute: inside compute(): end
    threadCompute: after compute()
    

    【讨论】:

      【解决方案4】:

      如何在不同步整个地图的情况下在 Java 中实现这一点?

      这里有一些很好的答案,但使用ConcurrentMap.replace(key, oldValue, newValue) method which is atomic 有一个更简单的答案。

      while (true) {
          Config newConfig = generateNewConfig();
          Config oldConfig = configStore.get(configName);
          if (!newConfig.replaces(oldConfig)) {
              // nothing to do
              break;
          }
          // this is atomic and will only replace the config if the old hasn't changed
          if (configStore.replace(configName, oldConfig, newConfig)) {
              // if we replaced it then we are done
              break;
          }
          // otherwise, loop around and create a new config
      }
      

      【讨论】:

        猜你喜欢
        • 2010-09-06
        • 1970-01-01
        • 1970-01-01
        • 2021-11-19
        • 2020-08-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多