【发布时间】:2023-04-04 21:12:01
【问题描述】:
我目前正在使用 Apache Curator 将共享资源(数据库中的一行)的锁定外部化。 总结一下这个问题, 我正在运行一个服务的 2 个实例(使用 Spring Boot),让我们调用这个服务 A,然后调用部署在不同区域的实例 A1 和 A2。 我锁定了代表文件的共享数据库上的表的 id(主键)。
在服务 A 的代码中,我创建了一个单例 (BaseLockService) 来处理项目中的所有锁定。这也意味着对于 2 个正在运行的实例,它们每个都包含一个用于处理锁定的单例。我正在使用的配方是Shared Reentrant Lock,它使用的是 InterProcessMutex 类,但是从来没有一个可重入锁的情况。它的描述最接近我的需求的类。
运行的主进程是一个@Scheduled,执行时间之间有30秒的延迟。 此外,我为 ThreadPoolTaskScheduler 创建了一个 bean,它将 UUID 附加到线程名称并且池大小为 1。 这个 UUID 的原因是因为没有它,当 A1 和 A2 同时运行时,它们都包含一个名为“task-scheduler-1”的线程。这最初导致了我的问题 使用锁定,因为 A1 可能拥有锁定,然后在处理文件的同时,A2 请求锁定,并且由于它们共享相同的名称,Curator 在 lock.acquire() 上返回 true,因此两个实例拥有相同的锁定。
运行一个实例时,这不是问题。我在 ZooKeeper 中看到正在创建 ZNode,并且我看到了 Curator 为临时锁生成的 UUID。 当运行两个或更多实例时,进程有时会进入 A1 拥有锁的竞争状态,然后运行一个冗长的进程。然后 A2 以某种方式获得了锁,快速完成该过程并释放锁。然后当 A1 完成并尝试解锁时,我得到以下异常:
[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:274)
at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:268)
at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
at com.myApp.lock.BaseLockService.lambda$unlockAllIDs$0(BaseLockService.java:143)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)
这是我复制情况的单元测试:
@Test
public void baseLockTest() {
List<Lockable> filesToProcess = new ArrayList<>();
//For now only 1 to limit complexity
Lockable fileToLock = FileSource.builder()
.id(1)
.build();
filesToProcess.add(fileToLock);
Runnable task = () -> {
log.info("ATTEMPT LOCK");
Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);
if (!lockedBatch.isEmpty()) {
try {
log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ATTEMPT UNLOCK");
lockService.unlockAll(lockedBatch);
}
};
System.out.println("**********************************************************");
//Simulate two Service instances of 1 thread
int totalThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);
List<Future> locksProcessed = new ArrayList<>(totalThreads);
for (int i = 0; i < 1000; i++) {
locksProcessed.add(executorService.submit(task));
}
Future f;
while(!locksProcessed.isEmpty()){
Iterator<Future> iterator = locksProcessed.iterator();
while(iterator.hasNext()){
f = iterator.next();
if(f.isDone()){
iterator.remove();
}
}
}
System.out.println("ALL DONE!!!");
}
这是 BaseLockService 中的锁定和解锁方法:
public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
Set<LockableHandle> effectivelyLocked = new HashSet<>();
Iterator<Lockable> desiredLockIterator = desiredLock.iterator();
while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
Lockable toLock = desiredLockIterator.next();
String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
InterProcessMutex lock = createMutex(lockPath);
try {
if (lock.acquire(0, TimeUnit.SECONDS)) {
LockableHandle handle = new LockableHandle(toLock, lock);
effectivelyLocked.add(handle);
locks.put(handle.getId(), handle);
} else {
log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
toLock.getId(),
lockPath));
}
} catch (Exception e) {
log.error("Cannot lock path " + lockPath, e);
}
}
log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
desiredLock.size(),
effectivelyLocked.size()));
return effectivelyLocked;
}
public void unlock(final LockableHandle lockHandle) {
boolean success = false;
try {
InterProcessMutex lock = lockHandle.getMutex();
if (lock != null) {
lock.release();
client.delete()
.deletingChildrenIfNeeded()
.forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
success = true;
}
} catch (Exception e) {
log.error("Can't unlock lock #" + lockHandle, e);
} finally {
locks.remove(lockHandle.getId());
}
log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
lockHandle.getId(),
success));
}
这是服务实例化后调用的init()方法:
public void init() {
log.info("Stating initialization of the Lock Service");
locks = new HashMap<>();
client = createClient();
client.start();
try {
client.blockUntilConnected();
if (client.isZk34CompatibilityMode()) {
log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
}
} catch (InterruptedException ie) {
log.error("Cannot connect to ZooKeeper.", ie);
}
log.info("Completed initialization of the Lock Service");
}
- 我检查了连接问题,这不是问题。
- 在日志中找不到 RECONNECTED、LOST、SUSPENDED 消息。
- 锁超时不是问题,因为 ZooKeeper 不会使任何锁过期,除非会话/连接终止。
- 我尝试了其他 Curator 食谱,但它们不能满足我的需求。无论如何,它们也会抛出类似的异常。
- Apache Curator 的版本是 4.2.0,ZooKeeper 是 3.4.X
我不确定缺少什么,但没有任何选择。 感谢任何 cmets/建议
【问题讨论】:
-
- 你能发布 BaseLockService 的代码吗? - 您使用的会话超时时间是多少?
-
@Randgalt 我在原始问题中添加了附加代码。至于会话超时,我没有明确设置。有一种方法可以查找过时的锁(超过 10 分钟)并释放这些锁,但是锁定和解锁之间的时间通常只有几秒钟。我在 Curator 文档的错误处理部分看到了有关 ConnectionStateListener 的消息,但不知道如何实现。还认为在我的情况下可能没有必要,因为我没有看到任何有关 LOST、SUSPENDED 等状态的消息。
-
@Randgalt 在调试时,我检查了 CuratorZookeeperClient.state 对象和 sessionTimeoutMs = 60,000。 ConnectionTimeoutMs = 15,000。此外 CuratorZookeeperClient.state.zooKeeper 对象也有 sessionTimeout = 60,000。
-
@Randgalt 但是,如果我运行这个命令:(
((CuratorFrameworkImpl) lockService.client).getZooKeeper().getSessionTimeout()) 我得到 40,000,所以我不完全确定哪个是正确的值。对轰炸感到抱歉。 -
仍然缺少很多可能出错的代码。但是,也许锁定路径不一样?请注意,Curator 有“InterProcessMultiLock”,可能会对您有所帮助。但是,如果没有看到更多的代码,我就帮不上什么忙了。即“getLockPath()”的实现是什么?我们确定每个进程都生成相同的路径吗?
标签: java concurrency locking apache-zookeeper apache-curator