【问题标题】:Avoid deadlock with Future { blocking {} }避免与 Future { 阻塞 {} } 发生死锁
【发布时间】:2021-09-14 23:26:15
【问题描述】:

我正在使用以下代码在 blocking 块内获取 JDBC 连接,并将该连接传递给 fn: Connection => Future[_]。 fn 完成后,我想提交/回滚事务并关闭连接。

  def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
    Future {
      blocking {
        ds.getConnection
      }
    }.flatMap { conn =>
      fn(conn)
        .map { r => conn.commit(); conn.close(); r }
        .recoverWith {
          case e: Throwable =>
            conn.rollback()
            conn.close()
            throw e
        }
    }

我正在使用基于 ForkJoinPool 的单独执行上下文。

调用次数过多,这段代码就会陷入死锁。直觉上,这是有道理的。第一个未来,使用getConnection 调用,在等待可用连接时被阻塞,而可用连接正在等待 ExecutionContext 中的可用线程运行commit(); close() 块以释放连接并释放执行上下文中的线程getConnection 运行。我用线程转储验证了这种情况。

我发现解决此问题的唯一方法是在同一 Future {} 上运行所有内容,因此避免切换上下文:

  def withTransactionAsync[T](fn: Connection => Future[T]): Future[T] =
    Future {
      blocking {
        val conn = ds.getConnection

        try {
          conn.setAutoCommit(false)
          val r = Await.result(fn(conn), Duration.Inf)
          conn.commit()
          r
        } catch {
          case e: Throwable =>
            conn.rollback()
            throw e
        } finally
          conn.close()
      }
    }

但是这样我就屏蔽了Await.result。我想这不是一个大问题,因为我在 blocking 块内阻塞,但我担心这会产生无法预料的后果,并且不一定是这个 API 的调用者所期望的。

有没有办法在不使用Await 而只依赖Future 组合的情况下解决这个死锁?

我想可以证明这个函数不接受Connection => Future[T],而只接受Connection => T,但我想保留那个API。

如果我将ForkJoinPool 的大小增加到足够大,它会起作用,但是对于所有工作负载,该大小很难计算/预测,我不希望ForkJoinPool 的大小是我的数据库池大小的许多倍.

【问题讨论】:

  • 对你的阻塞和非阻塞代码使用不同的线程池。
  • 真的有必要每次都打开和关闭一个新连接吗?看起来效率很低。感觉Task模特更适合这里。
  • 我正在使用不同的线程池,它并不能阻止这个问题。我实际上并没有关闭连接,我的数据源是一个池(hikari cp)。 close 只是将连接返回到池中。
  • 其实说不通。 blocking 操作的全部意义在于 EC 为该调用生成了一个新线程,将现有线程留给非阻塞调用。所以这可能会创建大量线程,但它不应该死锁,因为应该总是有一个线程来执行非阻塞代码。当然,除非非阻塞代码实际上会阻塞,例如因为fn 阻塞或返回阻塞Futureconn. 操作块之一。
  • @Tim getConnection 在 maxConnections 等待池中的空闲连接后阻塞。 fn 也有阻塞代码,因为它正在访问数据库并且我的 jdbc 调用都是同步的,驱动程序不是异步的。

标签: scala concurrency future


【解决方案1】:

如 cmets 中所述,fn 是阻塞代码。但它不在blocking 子句中,因此它会占用池中的一个主线程。如果这种情况发生的次数足够多,线程池将耗尽线程,系统将死锁。

因此,对fn 的调用和随后的代码需要位于blocking 子句中,以便为其创建单独的线程,并且主线程仍可用于非阻塞代码。

考虑到阻塞代码的数量,可能值得研究一个Task 模型,每个连接一个线程而不是每个待处理操作一个线程,这样线程数就会受到限制。这基本上是解决getConnection 是同步的事实,这是 HikariCP 的问题。

【讨论】:

  • 事实上,由于 JDBC API,这都是阻塞代码,但不是 hikaricp。将fn 包装在blocking 块中不会改变这一点,因为它只会创建更多线程,但在某些时候EC 不会创建更多线程调用死锁。我收到了一个fn,我不能强迫这个api的调用者将他们的代码包装在blocking中。有效的方法是在不同的 EC 中运行 getConnectioncommit/close,然后它们不会死锁等待对方。我也不能使用Task 模型,fn: Connection => Future[_] API 被强制用于我的代码。
  • 我的建议是您将fn 包装在您的代码中的blocking 中;您无法阻止fn 在非阻塞上下文中调用阻塞代码。可以将Task 模型与该接口一起使用:当withTransactionAsync 被调用时,您创建一个Promise[T] 并从该promise 中返回Future[T],并将promise/fn 放在要执行的队列中。当有空闲连接时,使用该连接调用fn 并使用结果完成Promise。这消除了等待getConnection 的线程的需要,从而消除了导致死锁的一个可能原因。
猜你喜欢
  • 1970-01-01
  • 2017-12-17
  • 1970-01-01
  • 1970-01-01
  • 2014-12-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-25
相关资源
最近更新 更多