【问题标题】:akka stream reading endless http stream backpressured when readingakka 流读取无穷无尽的 http 流读取时背压
【发布时间】:2017-10-21 21:56:12
【问题描述】:

基本上这是我使用的代码。

当我与 curl 建立连接时,我看到 curl 命令中的所有实体都非常快。当我尝试用 akka 模拟相同的行为时,在打印出我得到的元素之间会有很大的停顿。

在前 4 条消息之后,下面的流不知何故得到了压力 其余 1 条消息在打印行的显着时间之后出现。

前 4 条消息大约是 2k JSON,最后一条没有。 5 是 80k JSON。

最后一个实体(编号 5)也是最大的块,我得到的印象是它是在流完成之前打印的。而且我很肯定它在运行 2-3 秒后就可以使用。

知道为什么这个流在读取前 4 个元素后就挂起

val awesomeHttpReq = Http().singleRequest(
  HttpRequest(
    method = GET,
    uri = Uri("http://some-service-providing-endless-http.stream")
  )
)

val a = Source.fromFuture(awesomeHttpReq).flatMapConcat {
  case HttpResponse(status, _, entity, _) =>
    // I saw some comments the back pressure might kick in
    // because I might not be consuming the bytes here properly
    // but this is totally in line with all the examples etc.

    entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
} map { bytes =>
  parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json])
} mapConcat { items =>
  // every line that comes in from previous stage contains
  // key elements - this I'm interested in, it's an array
  items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil
}

val b: Future[Vector[Json]] = a
  .takeWithin(50 second)
  .runWith(Sink.fold(Vector.empty[Json])((a, b) => {

    // I'm using this to see what's going on in the stream
    // there are significant pauses between the entities
    // in reality the elements are available in the stream (all 5)
    // within 2-3 seconds
    // and this printing just has very big pause after first 4 elements

    println(s"adding\n\n\n ${b.noSpaces}")
    a :+ b
  }))

Await.result(b, 1 minute)

我查看了这个问题,它似乎与我所拥有的 https://github.com/akka/akka-http/issues/57 非常接近,但不知何故未能找到对我的案例有帮助的东西。

我还尝试更改 akka http 的块大小,但没有真正帮助。

这里是传入消息的时间: 从流初始化:

1.  881 ms
2.  889 ms
3.  894 ms
4.  898 ms
// I don't understand why this wait time of 30 seconds in betweeen
5. 30871 ms

最后一条消息显然在某处挂了 30 秒

任何想法都将不胜感激。

更新:

由于前 4 个元素始终在 4 处出现并且第 5 个元素等待 30 秒真的很奇怪,我决定将 initial-input-buffer-size = 4 从默认的 4 增加到 16,现在它可以按预期工作。我只是无法理解上面代码中背压的作用。

更新 2:

缓冲区大小有助于我的简单示例。但在我真正的问题中,我遇到了一些非常奇怪的事情:

entity.withoutSizeLimit.dataBytes
    .alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8))))
    .via(Framing delimiter (ByteString("\n"), Int.MaxValue))
    .buffer(1000, OverflowStrategy.backpressure)
    .alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8))))

我可以在框架(第 1 阶段)之前看到我需要的消息,但在日志中(第 2 阶段)看不到它之后的消息。而且我通过在其后放置缓冲区来确保有足够的空间来推动。

现在我发现换行符并没有真正进入前面的舞台(第一阶段),每行通常都是这样结束的:

"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString
res12: String =
"}
"

在我的最后一项中,我缺少最后一个字节a,基本上新行没有进入框架。所以整个东西都不会发出来。

【问题讨论】:

  • 有趣,我想知道你是否可以在没有 akka-http 的情况下重现这个,即将一些源 JSON 转储到文件中,并使用 Source.fromFile 而不是 http 请求。
  • 当我从 curl 中转储时,它可以工作。我现在也尝试了initial-input-buffer-size = 16,它按预期工作......这真的很奇怪,看起来背压在某个地方。但不知道在哪里。
  • 尝试将文件作为流,使用与此处相同的代码。我没有遇到这个问题:( - 现在让我有点发疯:D

标签: scala http curl akka


【解决方案1】:

经过相当多的调查后,我决定绕过这个问题,因为看起来那里有多种因素的组合。整个问题的输入源实际上是我公司使用的以kafka为背景的专有企业服务总线:https://github.com/zalando/nakadi

根据上述症状,我在想系统可能没有按照文档播放,并且他们可能没有通过附加发送\n,但他们将其准备到每一行,但事实并非如此,因为我签入代码:https://github.com/zalando/nakadi/blob/0859645b032d19f7baa919877f72cb076f1da867/src/main/java/org/zalando/nakadi/service/EventStreamWriterString.java#L36

看到这个后,我尝试使用这个例子来模拟整个事情:

build.sbt

name := "test-framing"

version := "0.1"

scalaVersion := "2.12.4"    

lazy val akkaVersion = "2.5.6"
lazy val akkaHttpVersion = "10.0.10"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % akkaVersion,
  "com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
  "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion
)

scalacOptions in Compile ++= (scalacOptions in Compile).value :+ "-Yrangepos"

*TestApp.scala - 我的代码中出现问题的地方 *

import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object TestApp extends App {

  implicit val system = ActorSystem("MyAkkaSystem")
  implicit val materializer = ActorMaterializer()

  val awesomeHttpReq = Http().singleRequest(
    HttpRequest(
      method = HttpMethods.GET,
      uri = Uri("http://localhost:9000/streaming-json")
    )
  )

  val a = Source.fromFuture(awesomeHttpReq).flatMapConcat {
    case HttpResponse(status, _, entity, _) =>
      entity.withoutSizeLimit.getDataBytes
        .via(Framing delimiter (ByteString("\n"), Int.MaxValue))
  } map { bytes =>
    bytes decodeString StandardCharsets.UTF_8
  }

  val b: Future[Vector[String]] = a
    .takeWithin(50 second)
    .runWith(Sink.fold(Vector.empty[String])((a, b) => {
      println(s"adding $b")
      a :+ b
    }))

  Await.result(b, 1 minute)

}

* 模拟端点 *

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.util.ByteString
import spray.json._

import scala.concurrent.duration._
import scala.io.StdIn

object TestApp2 extends App {

  implicit val system = ActorSystem("MyAkkaSystem")
  implicit val materializer = ActorMaterializer()

  implicit val executionContext = system.dispatcher

  case class SomeData(name: String)

  trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
    implicit val someFormat = jsonFormat1(SomeData)
  }

  val start = ByteString.empty
  val sep = ByteString("\n")
  val end = ByteString.empty

  implicit val jsonStreamingSupport = EntityStreamingSupport
    .json()
    .withFramingRenderer(Flow[ByteString].intersperse(sep))

  object MyJsonService extends Directives with JsonSupport {

    def streamingJsonRoute =
      path("streaming-json") {
        get {
          val sourceOfNumbers = Source(1 to 1000000)

          val sourceOfDetailedMessages =
            sourceOfNumbers
              .map(num => SomeData(s"Hello $num"))
              .throttle(elements = 5,
                        per = 30 second,
                        maximumBurst = 6,
                        mode = ThrottleMode.Shaping)

          complete(sourceOfDetailedMessages)
        }
      }
  }

  val bindingFuture =
    Http().bindAndHandle(MyJsonService.streamingJsonRoute, "localhost", 9000)

  println(s"Server online at http://localhost:9000/\nPress RETURN to stop...")
  StdIn.readLine() // let it run until user presses return
  bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate()) // and shutdown when done

}

在模拟端点中,我得到了预期的行为,所以 akka 并没有什么问题。

当多个库 + nakadi 组合在一起时可能仍然会出现一些问题,但这只是打猎。最后,如果我将batch_flush_timeout 降低到某个较低的值,服务器实际上会将下一个事件发送到管道中,因此管道中最后的消息将被推送到我的应用程序层,这样我就可以做到处理它。

基本上所有这些文本都是因为一个字节不知何故没有进入框架,但是在过去的几天里我又学到了很多关于 akka 流的知识。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-12-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-07
    • 1970-01-01
    • 2016-10-14
    相关资源
    最近更新 更多