【问题标题】:Apache Curator double Locking issue with multiple services多个服务的 Apache Curator 双重锁定问题
【发布时间】:2023-04-04 21:12:01
【问题描述】:

我目前正在使用 Apache Curator 将共享资源(数据库中的一行)的锁定外部化。 总结一下这个问题, 我正在运行一个服务的 2 个实例(使用 Spring Boot),让我们调用这个服务 A,然后调用部署在不同区域的实例 A1 和 A2。 我锁定了代表文件的共享数据库上的表的 id(主键)。

在服务 A 的代码中,我创建了一个单例 (BaseLockService) 来处理项目中的所有锁定。这也意味着对于 2 个正在运行的实例,它们每个都包含一个用于处理锁定的单例。我正在使用的配方是Shared Reentrant Lock,它使用的是 InterProcessMutex 类,但是从来没有一个可重入锁的情况。它的描述最接近我的需求的类。

运行的主进程是一个@Scheduled,执行时间之间有30秒的延迟。 此外,我为 ThreadPoolTask​​Scheduler 创建了一个 bean,它将 UUID 附加到线程名称并且池大小为 1。 这个 UUID 的原因是因为没有它,当 A1 和 A2 同时运行时,它们都包含一个名为“task-scheduler-1”的线程。这最初导致了我的问题 使用锁定,因为 A1 可能拥有锁定,然后在处理文件的同时,A2 请求锁定,并且由于它们共享相同的名称,Curator 在 lock.acquire() 上返回 true,因此两个实例拥有相同的锁定。

运行一个实例时,这不是问题。我在 ZooKeeper 中看到正在创建 ZNode,并且我看到了 Curator 为临时锁生成的 UUID。 当运行两个或更多实例时,进程有时会进入 A1 拥有锁的竞争状态,然后运行一个冗长的进程。然后 A2 以某种方式获得了锁,快速完成该过程并释放锁。然后当 A1 完成并尝试解锁时,我得到以下异常:

[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
    at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
    at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:274)
    at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:268)
    at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
    at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
    at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
    at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
    at com.myApp.lock.BaseLockService.lambda$unlockAllIDs$0(BaseLockService.java:143)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)

这是我复制情况的单元测试:

@Test
public void baseLockTest() {
    List<Lockable> filesToProcess = new ArrayList<>();

    //For now only 1 to limit complexity
    Lockable fileToLock = FileSource.builder()
            .id(1)
            .build();

    filesToProcess.add(fileToLock);

    Runnable task = () -> {
        log.info("ATTEMPT LOCK");
        Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);

        if (!lockedBatch.isEmpty()) {

            try {
                log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            log.info("ATTEMPT UNLOCK");
                lockService.unlockAll(lockedBatch);
        }
    };

    System.out.println("**********************************************************");

    //Simulate two Service instances of 1 thread
    int totalThreads = 2;
    ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);

    List<Future> locksProcessed = new ArrayList<>(totalThreads);
    for (int i = 0; i < 1000; i++) {
        locksProcessed.add(executorService.submit(task));
    }

    Future f;
    while(!locksProcessed.isEmpty()){
        Iterator<Future> iterator = locksProcessed.iterator();
        while(iterator.hasNext()){
            f = iterator.next();
            if(f.isDone()){
                iterator.remove();
            }
        }

    }

    System.out.println("ALL DONE!!!");
}

这是 BaseLockService 中的锁定和解锁方法:

    public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
    Set<LockableHandle> effectivelyLocked = new HashSet<>();
    Iterator<Lockable> desiredLockIterator = desiredLock.iterator();

    while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
        Lockable toLock = desiredLockIterator.next();
        String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
        InterProcessMutex lock = createMutex(lockPath);

        try {
            if (lock.acquire(0, TimeUnit.SECONDS)) {
                LockableHandle handle = new LockableHandle(toLock, lock);
                effectivelyLocked.add(handle);
                locks.put(handle.getId(), handle);
            } else {
                log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
                        toLock.getId(),
                        lockPath));
            }
        } catch (Exception e) {
            log.error("Cannot lock path " + lockPath, e);
        }
    }

    log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
            desiredLock.size(),
            effectivelyLocked.size()));

    return effectivelyLocked;
}

    public void unlock(final LockableHandle lockHandle) {
    boolean success = false;

    try {
        InterProcessMutex lock = lockHandle.getMutex();
        if (lock != null) {
            lock.release();
            client.delete()
                    .deletingChildrenIfNeeded()
                    .forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
            success = true;
        }
    } catch (Exception e) {
        log.error("Can't unlock lock #" + lockHandle, e);
    } finally {
        locks.remove(lockHandle.getId());
    }

    log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
            lockHandle.getId(),
            success));
}

这是服务实例化后调用的init()方法:

    public void init() {
    log.info("Stating initialization of the Lock Service");
    locks = new HashMap<>();
    client = createClient();
    client.start();

    try {
        client.blockUntilConnected();
        if (client.isZk34CompatibilityMode()) {
            log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
        }
    } catch (InterruptedException ie) {
        log.error("Cannot connect to ZooKeeper.", ie);
    }

    log.info("Completed initialization of the Lock Service");
}
  • 我检查了连接问题,这不是问题。
  • 在日志中找不到 RECONNECTED、LOST、SUSPENDED 消息。
  • 锁超时不是问题,因为 ZooKeeper 不会使任何锁过期,除非会话/连接终止。
  • 我尝试了其他 Curator 食谱,但它们不能满足我的需求。无论如何,它们也会抛出类似的异常。
  • Apache Curator 的版本是 4.2.0,ZooKeeper 是 3.4.X

我不确定缺少什么,但没有任何选择。 感谢任何 cmets/建议

【问题讨论】:

  • - 你能发布 BaseLockService 的代码吗? - 您使用的会话超时时间是多少?
  • @Randgalt 我在原始问题中添加了附加代码。至于会话超时,我没有明确设置。有一种方法可以查找过时的锁(超过 10 分钟)并释放这些锁,但是锁定和解锁之间的时间通常只有几秒钟。我在 Curator 文档的错误处理部分看到了有关 ConnectionStateListener 的消息,但不知道如何实现。还认为在我的情况下可能没有必要,因为我没有看到任何有关 LOST、SUSPENDED 等状态的消息。
  • @Randgalt 在调试时,我检查了 CuratorZookeeperClient.state 对象和 sessionTimeoutMs = 60,000。 ConnectionTimeoutMs = 15,000。此外 CuratorZookeeperClient.state.zooKeeper 对象也有 sessionTimeout = 60,000。
  • @Randgalt 但是,如果我运行这个命令:(((CuratorFrameworkImpl) lockService.client).getZooKeeper().getSessionTimeout()) 我得到 40,000,所以我不完全确定哪个是正确的值。对轰炸感到抱歉。
  • 仍然缺少很多可能出错的代码。但是,也许锁定路径不一样?请注意,Curator 有“InterProcessMultiLock”,可能会对您有所帮助。但是,如果没有看到更多的代码,我就帮不上什么忙了。即“getLockPath()”的实现是什么?我们确定每个进程都生成相同的路径吗?

标签: java concurrency locking apache-zookeeper apache-curator


【解决方案1】:

我在您发送的Locking Issue Example 中发现了一些问题。可能是这些特定于示例,但如果这些也在您的代码中,它将解释您所看到的问题。

  1. Maven POM 指定不正确。 Curator 需要知道它处于 ZK 3.4.x 兼容模式 - 这样做的方法是 described here。 TL;DR 是从 Curator 依赖项中排除 Zookeeper,并将直接依赖项添加到 Zookeeper 3.4.x。
  2. BaseLockService 中的locks 字段应该是ConcurrentHashMap
  3. BaseLockService#unlock 正在尝试通过调用 client.delete()... 来清理锁定路径。这行不通。这种代码存在固有的竞争,这就是为什么 Curator 拥有“Reaper”类的原因,也是我将容器节点推入 Zookeeper 3.5.x 的原因。请注意,产生NoNode 异常的是这行代码,而不是Curator 锁定代码。我建议您删除该代码,不要担心它或迁移到 Zookeeper 3.5.x。
  4. 我不认为BaseLockService 应该继续重新创建InterProcessMutex。它应该保留他们的地图或其他东西。

当我应用上面的1-3时测试成功(我尝试了多次)。我已经打开了 PR on your test project 进行了 3 处更改。

【讨论】:

  • 感谢您竭尽全力帮助解决这个问题!非常感激!至于第 4 点,我不明白保留收藏(地图)的原因。会是 Map,其中 String 是 ZK 路径并重用 InterProcessMutex?我担心的是,当运行我的服务的两个实例时,我会有 2 个系统争夺锁并同时管理它。这是一种粗略的方式,但我并排运行了两个测试实例,并检查了日志中的时间戳,发现它们共享锁。任何时候都没有锁重叠,所以我希望这足以信任它
  • > 2 个系统争夺锁 是的,也许 - 我必须查看用例才能进一步评论。但是,如果新的分配工作正常,那可能没问题。
  • 我将在一个并行环境中通宵测试它,看看它是如何工作的。本质上,该服务会查找所有已配置的文件源和包含新文件的视图。它一次处理 1 个源。找到新文件后,我们将其添加到要下载的文件列表中,并将其发送到单独的组件以下载文件。由于有两个实例都在寻找新文件,我想锁定当前正在搜索的源,这样它们就不会处理同一个源,从而防止要下载的文件列表中出现重复条目​​。
猜你喜欢
  • 2014-05-02
  • 1970-01-01
  • 1970-01-01
  • 2011-05-02
  • 2018-09-09
  • 2019-04-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多