【问题标题】:Convert HttpEntity.Chunked to Array[String]将 HttpEntity.Chunked 转换为 Array[String]
【发布时间】:2016-03-14 20:17:45
【问题描述】:

我有以下问题。 我正在向服务器查询一些数据并将其作为 HttpEntity.Chunked 取回。 响应字符串看起来像这样,最多有 10.000.000 行,如下所示:

[{"name":"param1","value":122343,"time":45435345},
{"name":"param2","value":243,"time":4325435},
......]

现在我想将传入的数据放入 Array[String] 中,其中每个字符串都是响应中的一行,因为稍后应该将它导入到 apache spark 数据帧中。 目前我正在这样做:

//For the http request
trait StartHttpRequest {
  implicit val system: ActorSystem
  implicit val materializer: ActorMaterializer

  def httpRequest(data: String, path: String, targetPort: Int, host: String): Future[HttpResponse] = {
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
      Http().outgoingConnection(host, port = targetPort)
    }
    val responseFuture: Future[HttpResponse] =
      Source.single(RequestBuilding.Post(uri = path, entity = HttpEntity(ContentTypes.`application/json`, data)))
        .via(connectionFlow)
        .runWith(Sink.head)
    responseFuture
  }
}

//result of the request
val responseFuture: Future[HttpResponse] = httpRequest(.....)

//convert to string
responseFuture.flatMap { response =>
        response.status match {
          case StatusCodes.OK =>
            Unmarshal(response.entity).to[String]
    }
}

//and then something like this, but with even more stupid stuff
responseFuture.onSuccess { str:String =>
    masterActor! str.split("""\},\{""")
}

我的问题是,将结果放入数组中的更好方法是什么? 如何直接解组响应实体?因为 .to[Array[String]] 例如不起作用。而且因为有这么多行,我可以用流来做吗?更有效?

【问题讨论】:

    标签: scala akka akka-stream akka-http http-chunked


    【解决方案1】:

    不按顺序回答您的问题:

    如何直接解组响应实体?

    有一个existing question & answer 与解组案例类数组有关。

    将结果放入数组的更好方法是什么?

    我会利用 Chunked 特性并使用流。这允许您同时进行字符串处理和 json 解析。

    首先你需要一个容器类和解析器:

    case class Data(name : String, value : Int, time : Long)
    
    object MyJsonProtocol extends DefaultJsonProtocol {
      implicit val dataFormat = jsonFormat3(Data)
    }
    

    然后你必须做一些操作才能让 json 对象看起来正确:

    //Drops the '[' and the ']' characters
    val dropArrayMarkers = 
      Flow[ByteString].map(_.filterNot(b => b == '['.toByte || b == ']'.toByte))
    
    val preppendBrace = 
      Flow[String].map(s => if(!s.startsWith("{")) "{" + s else s)
    
    val appendBrace = 
      Flow[String].map(s => if(!s.endsWith("}")) s + "}" else s)
    
    val parseJson = 
      Flow[String].map(_.parseJson.convertTo[Data])
    

    最后,结合这些 Flows 将 ByteString 的 Source 转换为 Source of Data 对象:

    def strSourceToDataSource(source : Source[ByteString,_]) : Source[Data, _] = 
      source.via(dropArrayMarkers)
            .via(Framing.delimiter(ByteString("},{"), 256, true))
            .map(_.utf8String)
            .via(prependBrace)
            .via(appendBrace)
            .via(parseJson)
    

    然后可以将此源引流到 Seq 的数据对象中:

    val dataSeq : Future[Seq[Data]] = 
      responseFuture flatMap { response =>
        response.status match {
          case StatusCodes.OK =>
            strSourceToDataSource(response.entity.dataBytes).runWith(Sink.seq)
        }
      }
    

    【讨论】:

    • 哇,谢谢您的详细回答。我目前的解决方案是这样处理的:case class Element(name: String, value: Int, time: Int)responseFuture.flatMap { response => response.status match { case StatusCodes.OK => Unmarshal(response.entity).to[Seq[Element]] } } 但我认为流媒体解决方案更有意义,所以我要尝试一下。你碰巧也使用 apache spark 吗?因为我可能有一个后续问题。谢谢!
    • @rincewind 欢迎您。我会看看另一个问题。快乐的黑客攻击。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-30
    • 1970-01-01
    • 1970-01-01
    • 2019-03-22
    • 1970-01-01
    • 2018-04-05
    • 1970-01-01
    相关资源
    最近更新 更多