【发布时间】:2017-10-21 21:56:12
【问题描述】:
基本上这是我使用的代码。
当我与 curl 建立连接时,我看到 curl 命令中的所有实体都非常快。当我尝试用 akka 模拟相同的行为时,在打印出我得到的元素之间会有很大的停顿。
在前 4 条消息之后,下面的流不知何故得到了压力 其余 1 条消息在打印行的显着时间之后出现。
前 4 条消息大约是 2k JSON,最后一条没有。 5 是 80k JSON。
最后一个实体(编号 5)也是最大的块,我得到的印象是它是在流完成之前打印的。而且我很肯定它在运行 2-3 秒后就可以使用。
知道为什么这个流在读取前 4 个元素后就挂起
val awesomeHttpReq = Http().singleRequest(
HttpRequest(
method = GET,
uri = Uri("http://some-service-providing-endless-http.stream")
)
)
val a = Source.fromFuture(awesomeHttpReq).flatMapConcat {
case HttpResponse(status, _, entity, _) =>
// I saw some comments the back pressure might kick in
// because I might not be consuming the bytes here properly
// but this is totally in line with all the examples etc.
entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
} map { bytes =>
parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json])
} mapConcat { items =>
// every line that comes in from previous stage contains
// key elements - this I'm interested in, it's an array
items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil
}
val b: Future[Vector[Json]] = a
.takeWithin(50 second)
.runWith(Sink.fold(Vector.empty[Json])((a, b) => {
// I'm using this to see what's going on in the stream
// there are significant pauses between the entities
// in reality the elements are available in the stream (all 5)
// within 2-3 seconds
// and this printing just has very big pause after first 4 elements
println(s"adding\n\n\n ${b.noSpaces}")
a :+ b
}))
Await.result(b, 1 minute)
我查看了这个问题,它似乎与我所拥有的 https://github.com/akka/akka-http/issues/57 非常接近,但不知何故未能找到对我的案例有帮助的东西。
我还尝试更改 akka http 的块大小,但没有真正帮助。
这里是传入消息的时间: 从流初始化:
1. 881 ms
2. 889 ms
3. 894 ms
4. 898 ms
// I don't understand why this wait time of 30 seconds in betweeen
5. 30871 ms
最后一条消息显然在某处挂了 30 秒
任何想法都将不胜感激。
更新:
由于前 4 个元素始终在 4 处出现并且第 5 个元素等待 30 秒真的很奇怪,我决定将 initial-input-buffer-size = 4 从默认的 4 增加到 16,现在它可以按预期工作。我只是无法理解上面代码中背压的作用。
更新 2:
缓冲区大小有助于我的简单示例。但在我真正的问题中,我遇到了一些非常奇怪的事情:
entity.withoutSizeLimit.dataBytes
.alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8))))
.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
.buffer(1000, OverflowStrategy.backpressure)
.alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8))))
我可以在框架(第 1 阶段)之前看到我需要的消息,但在日志中(第 2 阶段)看不到它之后的消息。而且我通过在其后放置缓冲区来确保有足够的空间来推动。
现在我发现换行符并没有真正进入前面的舞台(第一阶段),每行通常都是这样结束的:
"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString
res12: String =
"}
"
在我的最后一项中,我缺少最后一个字节a,基本上新行没有进入框架。所以整个东西都不会发出来。
【问题讨论】:
-
有趣,我想知道你是否可以在没有 akka-http 的情况下重现这个,即将一些源 JSON 转储到文件中,并使用
Source.fromFile而不是 http 请求。 -
当我从 curl 中转储时,它可以工作。我现在也尝试了
initial-input-buffer-size = 16,它按预期工作......这真的很奇怪,看起来背压在某个地方。但不知道在哪里。 -
尝试将文件作为流,使用与此处相同的代码。我没有遇到这个问题:( - 现在让我有点发疯:D