【发布时间】:2021-09-14 23:26:15
【问题描述】:
我正在使用以下代码在 blocking 块内获取 JDBC 连接,并将该连接传递给 fn: Connection => Future[_]。 fn 完成后,我想提交/回滚事务并关闭连接。
def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
Future {
blocking {
ds.getConnection
}
}.flatMap { conn =>
fn(conn)
.map { r => conn.commit(); conn.close(); r }
.recoverWith {
case e: Throwable =>
conn.rollback()
conn.close()
throw e
}
}
我正在使用基于 ForkJoinPool 的单独执行上下文。
调用次数过多,这段代码就会陷入死锁。直觉上,这是有道理的。第一个未来,使用getConnection 调用,在等待可用连接时被阻塞,而可用连接正在等待 ExecutionContext 中的可用线程运行commit(); close() 块以释放连接并释放执行上下文中的线程getConnection 运行。我用线程转储验证了这种情况。
我发现解决此问题的唯一方法是在同一 Future {} 上运行所有内容,因此避免切换上下文:
def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
Future {
blocking {
val conn = ds.getConnection
try {
conn.setAutoCommit(false)
val r = Await.result(fn(conn), Duration.Inf)
conn.commit()
r
} catch {
case e: Throwable =>
conn.rollback()
throw e
} finally
conn.close()
}
}
但是这样我就屏蔽了Await.result。我想这不是一个大问题,因为我在 blocking 块内阻塞,但我担心这会产生无法预料的后果,并且不一定是这个 API 的调用者所期望的。
有没有办法在不使用Await 而只依赖Future 组合的情况下解决这个死锁?
我想可以证明这个函数不接受Connection => Future[T],而只接受Connection => T,但我想保留那个API。
如果我将ForkJoinPool 的大小增加到足够大,它会起作用,但是对于所有工作负载,该大小很难计算/预测,我不希望ForkJoinPool 的大小是我的数据库池大小的许多倍.
【问题讨论】:
-
对你的阻塞和非阻塞代码使用不同的线程池。
-
真的有必要每次都打开和关闭一个新连接吗?看起来效率很低。感觉
Task模特更适合这里。 -
我正在使用不同的线程池,它并不能阻止这个问题。我实际上并没有关闭连接,我的数据源是一个池(hikari cp)。
close只是将连接返回到池中。 -
其实说不通。
blocking操作的全部意义在于 EC 为该调用生成了一个新线程,将现有线程留给非阻塞调用。所以这可能会创建大量线程,但它不应该死锁,因为应该总是有一个线程来执行非阻塞代码。当然,除非非阻塞代码实际上会阻塞,例如因为fn阻塞或返回阻塞Future或conn.操作块之一。 -
@Tim getConnection 在 maxConnections 等待池中的空闲连接后阻塞。
fn也有阻塞代码,因为它正在访问数据库并且我的 jdbc 调用都是同步的,驱动程序不是异步的。
标签: scala concurrency future