【问题标题】:Flow has failed with error Shutting down because of violation of the Reactive Streams specification由于违反 Reactive Streams 规范,流因错误关闭而失败
【发布时间】:2017-09-26 16:32:37
【问题描述】:

在使用 Akka Streams 时,我似乎永远无法正确处理错误。

这是我的代码

var db = Database.forConfig("oracle")
var mysqlDb = Database.forConfig("mysql_read")
var mysqlDbWrite = Database.forConfig("mysql_write")

implicit val actorSystem = ActorSystem()
val decider : Supervision.Decider = {
  case _: Exception =>
      println("got an exception restarting connections")
     // let us restart our connections
     db.close()
     mysqlDb.close()
     mysqlDbWrite.close()
     db = Database.forConfig("oracle")
     mysqlDb = Database.forConfig("mysql_read")
     mysqlDbWrite = Database.forConfig("mysql_write")
     Supervision.Restart
}
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

我有这样的流程

val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(10){ foo =>
  try {
     val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long]
     mysqlDbWrite.run(existsQuery).map(v => (foo, v))
  } catch {
     case e: Throwable =>
        println(s"Lookup failed for ${foo}")
        throw e // will restart the stream
  }
}.collect {case (f, v) if v.isEmpty => f}

所以基本上如果 foo 已经存在于 MySQL 中,那么记录不应该被流进一步处理。

我对这段代码的希望是,如果 mysql 查找出现任何故障(mysql 机器非常糟糕并且超时很常见),记录将被打印并丢弃,并且流将继续使用监督提供的剩余记录.

当我运行这段代码时。我看到类似的错误

[error] (mysql_write network timeout executor) java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state
java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5576)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Invalid socket timeout value or state
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872)
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4852)
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Socket is closed
    at java.net.Socket.setSoTimeout(Socket.java:1137)
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850)
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

[error] (mysql_write network timeout executor) java.lang.NullPointerException
java.lang.NullPointerException
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850)
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这里让我感到惊讶的是,这些异常并非来自我的 catch 块。因为我没有看到我的 catch 块的 println 语句。堆栈跟踪没有向我显示它的来源......但是因为它说mysql_write 我可以假设它是上面的流,因为只有这个流使用mysql_write

最后整个流因错误而崩溃

[trace] Stack trace suppressed: run last compile:runMain for the full output.
flow has failed with error Shutting down because of violation of the Reactive Streams specification.
14:51:06,973 |-INFO in ch.qos.logback.classic.AsyncAppender[asyncKafkaAppender] - Worker thread will flush remaining events before exiting.
[success] Total time: 3480 s, completed Sep 26, 2017 2:51:07 PM
14:51:07,603 |-INFO in ch.qos.logback.core.hook.DelayingShutdownHook@2320545b - Sleeping for 1 seconds

我不知道我做了什么违反了反应流规范!!

【问题讨论】:

    标签: akka-stream


    【解决方案1】:

    获得更可预测的解决方案的第一步是删除阻塞行为 (Await.result) 并使用 mapAsyncalreadyExistsFilter 流程的重写可能是:

      val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(3) { foo ⇒
        val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long]
        foo → Await.result(mysqlDbWrite.run(existsQuery), Duration.Inf)
      }.collect{
        case (foo, res) if res.isDefined ⇒ foo
      }
    

    更多关于 Akka 阻塞的信息可以在docs找到。

    【讨论】:

    • OK 问题又来了。我正在更新我上面的帖子。
    【解决方案2】:

    Stefano 给出的答案是正确的。由于流程中的代码阻塞,确实出现了错误。

    尽管我的初始程序是针对 scala 2.11 运行的,即使在切换到 mapAsync 之后,问题仍然存在。

    由于这是一个命令行工具,我很容易切换到 scala 2.12 并重试。

    当我尝试使用 Scala 2.12 时,它运行良好。

    对我有很大帮助的一件事是在依赖项中有"ch.qos.logback" % "logback-classic" % "1.2.3",。这将向您显示正在执行的每条 SQL 语句,并轻松查看是否出现问题。

    【讨论】:

      猜你喜欢
      • 2022-07-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-31
      • 1970-01-01
      • 1970-01-01
      • 2020-07-08
      • 2018-05-20
      相关资源
      最近更新 更多