【发布时间】:2017-10-27 01:30:02
【问题描述】:
我有一个来自输入文件的Source[ByteString, _],有 3 行这样的(实际上输入是一个带有连续流的 TCP 套接字):
{"a":[2
33]
}
现在的问题是我想将其解析为Source[ChangeMessage,_],但是我发现的唯一示例处理的是每一行都有一个完整的 JSON 消息,而不是每个 JSON 消息可以分割成多行。
我发现的一个例子是 this 库,但是它期望 } 或 , 作为最后一个字符,即每行一个 JSON。下面的示例显示了此设置。
"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val data = FileIO.fromPath(file)
.via(CirceStreamSupport.decode[ChangeMessage])
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectComplete()
}
另一种选择是使用折叠和平衡},并且仅在整个 JSON 完成时才发出。这样做的问题是折叠运算符仅在流完成时发出,因为这是一个连续的流,我不能在这里使用它。
我的问题是:解析分块 JSON 流的最快方法是什么 在 AKKA Stream 中,是否有任何可用的软件已经可以 这?如果可能的话,我想使用circe
【问题讨论】:
标签: json scala akka akka-stream