【问题标题】:Decode chunked JSON with AKKA Stream使用 AKKA Stream 解码分块的 JSON
【发布时间】:2017-10-27 01:30:02
【问题描述】:

我有一个来自输入文件的Source[ByteString, _],有 3 行这样的(实际上输入是一个带有连续流的 TCP 套接字):

{"a":[2
33]
}

现在的问题是我想将其解析为Source[ChangeMessage,_],但是我发现的唯一示例处理的是每一行都有一个完整的 JSON 消息,而不是每个 JSON 消息可以分割成多行。

我发现的一个例子是 this 库,但是它期望 }, 作为最后一个字符,即每行一个 JSON。下面的示例显示了此设置。

"My decoder" should "decode chunked json" in {
    implicit val sys = ActorSystem("test")
    implicit val mat = ActorMaterializer()
    val file = Paths.get("chunked_json_stream.json")
    val data = FileIO.fromPath(file)
    .via(CirceStreamSupport.decode[ChangeMessage])
    .runWith(TestSink.probe[ChangeMessage])
    .request(1)
    .expectComplete()
  }

另一种选择是使用折叠和平衡},并且仅在整个 JSON 完成时才发出。这样做的问题是折叠运算符仅在流完成时发出,因为这是一个连续的流,我不能在这里使用它。

我的问题是:解析分块 JSON 流的最快方法是什么 在 AKKA Stream 中,是否有任何可用的软件已经可以 这?如果可能的话,我想使用circe

【问题讨论】:

    标签: json scala akka akka-stream


    【解决方案1】:

    正如knutwalker/akka-stream-json 的文档所说:

    此流程甚至支持解析多个 json 文档,无论它们可能到达何种碎片,这对于使用基于流/sse 的 API 非常有用。

    在您的情况下,您需要做的只是分隔传入的字节字符串:

    "My decoder" should "decode chunked json" in {
        implicit val sys = ActorSystem("test")
        implicit val mat = ActorMaterializer()
        val file = Paths.get("chunked_json_stream.json")
    
        val sourceUnderTest =
          FileIO.fromPath(file)
            .via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true))
            .via(CirceStreamSupport.decode[ChangeMessage])
    
        sourceUnderTest
          .runWith(TestSink.probe[ChangeMessage])
          .request(1)
          .expectNext(ChangeMessage(List(233)))
          .expectComplete()
    }
    

    这是因为从文件中读取时,ByteString 元素包含多行,因此 Circe 无法解析格式错误的 json。当你用新行分隔时,流中的每个元素都是一个单独的行,因此 Circe 能够使用上述功能对其进行解析。

    【讨论】:

    • 其实我是通过使用没有任何 JsonFraming 的 CirceStreamSupport 让它工作的?
    • @user3139545 感谢您的评论。我澄清了我的答案。
    【解决方案2】:

    不幸的是,我不知道有任何 Scala 库支持基于流的 JSON 解析。 似乎在我看来,Google Gson 对此提供了一些支持,但我不完全确定它能否正确处理“损坏”输入。

    但是,您可以做的是以流方式收集 JSON 文档,类似于 Framing.delimiter 所做的。这与您提到的替代方案非常相似,但它没有使用fold();如果你这样做,你可能需要模仿 Framing.delimiter 所做的事情,但不是寻找单个分隔符,而是需要平衡花括号(如果可以使用顶级数组,还可以选择括号),缓冲中间数据,直到整个文档通过,您可以将其作为适合解析的单个块发出。

    顺便说一句,适合在 Akka Streams 中使用的流式 JSON 解析器的适当接口可能如下所示:

    trait Parser {
      def update(data: Array[Byte])  // or String
      def pull(): Option[Either[Error, JsonEvent]]
    }
    

    如果pull() 无法再读取但传入文档中没有实际的语法错误,则返回NoneJsonEvent 是用于描述流解析器事件的一些标准结构(即带有BeginObjectBeginArrayEndObjectEndArrayString 等子类)。如果您找到或创建了这样的库,您可以使用它来解析来自 ByteStrings 的 Akka 流中的数据。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-11-12
      • 1970-01-01
      • 2017-10-03
      • 1970-01-01
      • 2016-11-21
      • 1970-01-01
      • 2016-12-24
      相关资源
      最近更新 更多